Designing Agentic AI framework for Autonomous BGP Path Manipulation and SLA Protection in Enterprise Network

By Admin | 21-05-2026

Abstract:

From the standpoint of enterprise and service provider production networks, BGP path manipulation is perhaps among the most important of functions due to its intimate control over ingress, egress, and flow through their network infrastructure. BGP attributes that influence routing decision for traffic engineering, redundancy, load balancing based on different makeup of a path & latency, disaster recovery using Local Preference, AS Path Prepending, MED which stands for Multi-Exit Discriminator/Weight & Community values. In large customer-production environments, wrong BGP path selection can create routing loops, asymmetric routing, congestion, and packet losses impacting service levels leading to SLA violations and even the complete outage of services affecting customers and business-critical applications. As a result, network engineers must consider carefully any change of policy to adopt it anywhere. Here, it is where —Agentic AI enables— the biggest speedup: it constantly analyses real-time telemetry, BGP updates, historical incidents & network topology data to automatically/syn-autonomously make intelligent routing decisions. Powered by Digital Twin Simulation and AI Insights, the agent predicts the effects of BGP policy changes before operationalizing them, then monitors anomalous route behavior by detecting unstable peers, and finally proposes routing paths with maximum stability and minimum risk. This allows dynamic adjusting of Local Preference, intelligent traffic rerouting after congestion detection, rampant route leak prevention and decreased blast radius in case of failure-all while continuing to meet SLA. With observability, simulation and autonomous decision-making, Agentic AI changes a legacy reactive approach to BGP operations into one that is proactive, self-healing with extremely resilient network management.

Scenario: Brayan is working as a Network Optimization Engineer at Wipro Telecom in an enterprise network environment. Based on a customer requirement, BGP path manipulation needs to be implemented to optimize traffic flow across the network. Currently, one network path is experiencing continuously increasing traffic utilization, while another segment is impacted due to an OFC (Optical Fiber Cable) cut, resulting in congestion, instability, and potential SLA degradation. To provide a better resolution, the enterprise network must intelligently reroute traffic by modifying BGP attributes such as Local Preference, AS Path Prepending, MED, or Community values to balance traffic and ensure high availability. Traditionally, this process requires manual analysis and configuration by network engineers, which can increase response time during critical incidents. However, with Agentic AI, the system can automatically analyse real-time telemetry, detect congestion and link failures, simulate the impact of routing changes using a Digital Twin topology, and autonomously perform optimized BGP path manipulation. This enables faster convergence, reduced downtime, improved traffic engineering, SLA protection, and intelligent self-healing operations. Let us understand this concept using a simple network topology example.

Here R1,R2 = Edge router of network

R3 = Route-reflector of network

R4 &R5 = DC-spine device 

ISP-A =Jio

ISP-B = Idea

ISP-C = Vodafone 

Phase 1 — The Telemetry & Observability Foundation 

It lays the groundwork for AI-enabled network operations to monitor in real time and across data silos. Simulates gNMI streaming telemetry from BGP routers as configured with OpenConfig YANG models to export metrics about network statistics, routing updates, CPU usage and protocol health. A BMP collector listens for BGP route advertisements, withdrawals and peer state changes. We stream all telemetry data through a Kafka-style event bus and store it in an InfluxDB-style time-series database for later use to analyze what has occurred and tips on troubleshooting. The feature extraction modules convert this raw telemetry into low-dimensional AI-ready datasets that serve as input to a wide range of ML algorithms and models like anomaly detection, predictive analytics, root-cause analysis, autonomous remediation etc. And in prod, all of the agents are replaced by actual technologies such as pygnmi / gnmi-py, GoBMP, Confluent Kafka and InfluxDB clients to provide a scalable and reliable network observability.

phase1_telemetry/telemetry_pipeline.py

"""

import time
import random
import json
import threading
import queue
from collections import defaultdict, deque
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Callable, Optional
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))

from common.models import (
   BGPPeer, BGPRoute, RouterTelemetry, BGPState, NetworkStateSnapshot
)
from common.logger import get_logger

log = get_logger("Phase1-Telemetry", phase=1)


# ─────────────────────────────────────────────────────────────────────────────
# 1.1  Network Topology Definition
# ─────────────────────────────────────────────────────────────────────────────

TOPOLOGY = {
   "routers": [
       {"id": "R1", "hostname": "edge-router-01", "role": "edge",    "asn": 65001},
       {"id": "R2", "hostname": "edge-router-02", "role": "edge",    "asn": 65001},
       {"id": "R3", "hostname": "rr-01",          "role": "reflector","asn": 65001},
       {"id": "R4", "hostname": "dc-spine-01",    "role": "spine",   "asn": 65001},
       {"id": "R5", "hostname": "dc-spine-02",    "role": "spine",   "asn": 65001},
   ],
   "peers": [
       {"local": "R1", "remote": "ISP-A", "remote_asn": 1234, "ebgp": True},
       {"local": "R1", "remote": "ISP-B", "remote_asn": 5678, "ebgp": True},
        {"local": "R2", "remote": "ISP-B", "remote_asn": 3312, "ebgp": True}
       {"local": "R2", "remote": "ISP-C", "remote_asn": 9012, "ebgp": True},
       {"local": "R1", "remote": "R3",    "remote_asn": 65001,"ebgp": False},
       {"local": "R2", "remote": "R3",    "remote_asn": 65001,"ebgp": False},
       {"local": "R3", "remote": "R4",    "remote_asn": 65001,"ebgp": False},
       {"local": "R3", "remote": "R5",    "remote_asn": 65001,"ebgp": False},
   ],
   "isps": {
       "ISP-A": {"bandwidth_gbps": 10, "cost_per_gb": 0.02, "base_latency_ms": 8},
       "ISP-B": {"bandwidth_gbps": 10, "cost_per_gb": 0.015,"base_latency_ms": 12},
       "ISP-C": {"bandwidth_gbps": 5,  "cost_per_gb": 0.025,"base_latency_ms": 6},
   }
}

SAMPLE_PREFIXES = [
   "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16",
   "8.8.8.0/24", "1.1.1.0/24",   "203.0.113.0/24",
   "198.51.100.0/24", "100.64.0.0/10",
]


# ─────────────────────────────────────────────────────────────────────────────
# 1.2  gNMI Streaming Telemetry Simulator
# ─────────────────────────────────────────────────────────────────────────────

class GNMISimulator:
   """
   Simulates gNMI model-driven streaming telemetry.

   Production equivalent:
       from pygnmi.client import gNMIclient
       with gNMIclient(target=('router_ip', 57400), ...) as gc:
           for update in gc.subscribe(...):
               process(update)
   """

   def __init__(self, router_config: dict):
       self.router_id   = router_config["id"]
       self.hostname    = router_config["hostname"]
       self.asn         = router_config["asn"]
       self._running    = False
       self._fault_prob = 0.03   # 3 % chance of a peer flap per cycle

       # Per-peer BGP sessions
       self.peers: List[BGPPeer] = []
       for p in TOPOLOGY["peers"]:
           if p["local"] == self.router_id:
               self.peers.append(BGPPeer(
                   peer_id    = p["remote"],
                   peer_asn   = p["remote_asn"],
                   local_asn  = self.asn,
                   peer_ip    = f"10.0.{random.randint(1,254)}.1",
                   local_ip   = f"10.0.{random.randint(1,254)}.2",
                   is_ebgp    = p["ebgp"],
                   state      = BGPState.ESTABLISHED,
                   prefixes_received = random.randint(10, 800_000),
               ))

   def _simulate_state(self) -> RouterTelemetry:
       """Generate one telemetry snapshot (OpenConfig YANG equivalent)."""
       # Randomly inject fault
       for peer in self.peers:
           if random.random() < self._fault_prob:
               peer.state = random.choice([BGPState.DOWN, BGPState.ACTIVE])
               log.warning(f"  [gNMI] {self.hostname}: peer {peer.peer_id} → {peer.state.value}")
           elif peer.state != BGPState.ESTABLISHED:
               peer.state = BGPState.ESTABLISHED  # recover

       # Generate RIB sample
       rib = []
       for prefix in random.sample(SAMPLE_PREFIXES, min(4, len(SAMPLE_PREFIXES))):
           rib.append(BGPRoute(
               prefix      = prefix,
               next_hop    = f"10.0.{random.randint(1,254)}.1",
               as_path     = [self.asn] + [random.randint(1000, 65000) for _ in range(random.randint(1,4))],
               local_pref  = random.choice([50, 100, 150, 200]),
               med         = random.randint(0, 200),
               communities = [f"65001:{random.randint(100,199)}"],
               is_best     = random.random() > 0.4,
               peer_id     = random.choice(self.peers).peer_id if self.peers else "",
           ))

       return RouterTelemetry(
           router_id             = self.router_id,
           hostname              = self.hostname,
           timestamp             = time.time(),
           bgp_peers             = list(self.peers),
           rib_entries           = rib,
           cpu_percent           = random.uniform(5, 75),
           memory_percent        = random.uniform(20, 85),
           interface_utilisation = {
               "GigE0/0": random.uniform(10, 95),
               "GigE0/1": random.uniform(5, 80),
           },
           latency_ms            = {
               "ISP-A": random.gauss(8,  1.5),
               "ISP-B": random.gauss(12, 2.0),
               "ISP-C": random.gauss(6,  1.0),
           },
           packet_loss_percent   = {
               "ISP-A": max(0, random.gauss(0.1, 0.05)),
               "ISP-B": max(0, random.gauss(0.2, 0.1)),
               "ISP-C": max(0, random.gauss(0.05, 0.02)),
           },
       )

   def stream(self, callback: Callable, interval: float = 1.0, max_cycles: int = 5):
       """Stream telemetry updates to callback (simulates gNMI subscription)."""
       self._running = True
       cycles = 0
       log.info(f"[gNMI] Starting stream from {self.hostname} (interval={interval}s)")
       while self._running and cycles < max_cycles:
           snapshot = self._simulate_state()
           callback(snapshot)
           cycles += 1
           time.sleep(interval)
       self._running = False

   def stop(self):
       self._running = False


# ─────────────────────────────────────────────────────────────────────────────
# 1.3  BMP Collector (RFC 7854)
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class BMPMessage:
   """Simulated BMP route-monitoring message."""
   msg_type: str          # route_monitor | peer_up | peer_down | stats
   router_id: str
   peer_id: str
   peer_asn: int
   timestamp: float
   prefix: Optional[str]  = None
   withdrawn: bool        = False
   as_path: List[int]     = field(default_factory=list)
   communities: List[str] = field(default_factory=list)
   local_pref: int        = 100


class BMPCollector:
   """
   Simulates a BMP session collector (gobmp / pmacct equivalent).

   Production equivalent:
       - Listen on TCP 17442 for BMP sessions
       - Parse RFC 7854 message types
       - Forward to Kafka topic 'bgp.bmp.updates'
   """

   def __init__(self):
       self._messages: List[BMPMessage] = []

   def ingest(self, telemetry: RouterTelemetry):
       """Convert gNMI telemetry into BMP-style messages."""
       for peer in telemetry.bgp_peers:
           msg_type = "route_monitor" if peer.state == BGPState.ESTABLISHED else "peer_down"
           self._messages.append(BMPMessage(
               msg_type  = msg_type,
               router_id = telemetry.router_id,
               peer_id   = peer.peer_id,
               peer_asn  = peer.peer_asn,
               timestamp = telemetry.timestamp,
           ))
       for route in telemetry.rib_entries:
           self._messages.append(BMPMessage(
               msg_type   = "route_monitor",
               router_id  = telemetry.router_id,
               peer_id    = route.peer_id,
               peer_asn   = 0,
               timestamp  = telemetry.timestamp,
               prefix     = route.prefix,
               withdrawn  = False,
               as_path    = route.as_path,
               communities= route.communities,
               local_pref = route.local_pref,
           ))

   def get_recent(self, n: int = 20) -> List[BMPMessage]:
       return self._messages[-n:]

   def stats(self) -> dict:
       route_msgs = [m for m in self._messages if m.prefix]
       peer_msgs  = [m for m in self._messages if not m.prefix]
       return {
           "total_messages":  len(self._messages),
           "route_updates":   len(route_msgs),
           "peer_events":     len(peer_msgs),
           "unique_prefixes": len({m.prefix for m in route_msgs if m.prefix}),
       }


# ─────────────────────────────────────────────────────────────────────────────
# 1.4  In-Memory Kafka Bus (simulates confluent-kafka)
# ─────────────────────────────────────────────────────────────────────────────

class InMemoryKafka:
   """
   Simulates a Kafka broker for demo purposes.

   Production equivalent:
       from confluent_kafka import Producer, Consumer
       producer = Producer({'bootstrap.servers': 'kafka:9092'})
       producer.produce('bgp.telemetry', json.dumps(event))
   """

   TOPICS = [
       "bgp.telemetry.gnmi",
       "bgp.telemetry.bmp",
       "bgp.events.peer",
       "bgp.events.prefix",
       "bgp.agent.actions",
       "bgp.agent.outcomes",
   ]

   def __init__(self, max_per_topic: int = 10_000):
       self._store: Dict[str, deque] = {t: deque(maxlen=max_per_topic) for t in self.TOPICS}
       self._offsets: Dict[str, int] = {t: 0 for t in self.TOPICS}
       self._lock = threading.Lock()

   def produce(self, topic: str, value: dict):
       if topic not in self._store:
           self._store[topic] = deque(maxlen=10_000)
           self._offsets[topic] = 0
       with self._lock:
           self._store[topic].append({"offset": self._offsets[topic], "value": value, "ts": time.time()})
           self._offsets[topic] += 1

   def consume(self, topic: str, n: int = 10) -> List[dict]:
       with self._lock:
           msgs = list(self._store.get(topic, []))
           return msgs[-n:]

   def topic_stats(self) -> dict:
       return {t: len(msgs) for t, msgs in self._store.items()}


# ─────────────────────────────────────────────────────────────────────────────
# 1.5  In-Memory Time-Series Store (simulates InfluxDB / Prometheus)
# ─────────────────────────────────────────────────────────────────────────────

class InMemoryTimeSeries:
   """
   Simulates InfluxDB time-series storage.

   Production equivalent:
       from influxdb_client import InfluxDBClient, Point
       client = InfluxDBClient(url='http://influxdb:8086', token=TOKEN, org=ORG)
       write_api = client.write_api()
       write_api.write(bucket='bgp', record=Point('latency').tag('isp','ISP-A').field('ms', 8.3))
   """

   def __init__(self):
       self._series: Dict[str, List[dict]] = defaultdict(list)

   def write(self, measurement: str, tags: dict, fields: dict, timestamp: float = None):
       self._series[measurement].append({
           "ts":     timestamp or time.time(),
           "tags":   tags,
           "fields": fields,
       })

   def query_last(self, measurement: str, n: int = 10) -> List[dict]:
       return self._series.get(measurement, [])[-n:]

   def write_telemetry(self, t: RouterTelemetry):
       """Write a full RouterTelemetry snapshot to time-series."""
       for isp, lat in t.latency_ms.items():
           self.write("bgp_latency_ms",
                      tags={"router": t.router_id, "isp": isp},
                      fields={"value": round(lat, 3)},
                      timestamp=t.timestamp)
       for isp, loss in t.packet_loss_percent.items():
           self.write("bgp_packet_loss",
                      tags={"router": t.router_id, "isp": isp},
                      fields={"value": round(loss, 4)},
                      timestamp=t.timestamp)
       for iface, util in t.interface_utilisation.items():
           self.write("interface_utilisation",
                      tags={"router": t.router_id, "interface": iface},
                      fields={"value": round(util, 2)},
                      timestamp=t.timestamp)
       down_peers = [p for p in t.bgp_peers if p.state != BGPState.ESTABLISHED]
       self.write("bgp_peer_health",
                  tags={"router": t.router_id},
                  fields={"total": len(t.bgp_peers), "down": len(down_peers)},
                  timestamp=t.timestamp)


# ─────────────────────────────────────────────────────────────────────────────
# 1.6  Feature Extractor  (feeds Phase 3 AI models)
# ─────────────────────────────────────────────────────────────────────────────

class FeatureExtractor:
   """
   Builds a numeric feature vector from raw telemetry.
   This is the 'feature store' layer — features are consumed by Phase 3 models.
   """

   def __init__(self, ts_store: InMemoryTimeSeries):
       self._ts = ts_store
       self._prefix_history: Dict[str, List[str]] = defaultdict(list)

   def extract(self, snapshot: NetworkStateSnapshot) -> Dict[str, float]:
       """Return a flat feature dict for the current network state."""
       all_latencies, all_losses, all_peer_downs = [], [], 0

       for router in snapshot.routers:
           all_latencies.extend(router.latency_ms.values())
           all_losses.extend(router.packet_loss_percent.values())
           all_peer_downs += sum(1 for p in router.bgp_peers if p.state != BGPState.ESTABLISHED)

           # Track prefix churn
           prefixes_now = {r.prefix for r in router.rib_entries}
           hist = self._prefix_history[router.router_id]
           if hist:
               prev = set(hist[-1].split(",")) if hist[-1] else set()
               churn = len(prefixes_now.symmetric_difference(prev))
           else:
               churn = 0
           hist.append(",".join(sorted(prefixes_now)))
           if len(hist) > 10:
               hist.pop(0)

       features = {
           "avg_latency_ms":       round(sum(all_latencies) / max(len(all_latencies), 1), 3),
           "max_latency_ms":       round(max(all_latencies, default=0), 3),
           "avg_packet_loss_pct":  round(sum(all_losses) / max(len(all_losses), 1), 4),
           "max_packet_loss_pct":  round(max(all_losses, default=0), 4),
           "peer_down_count":      float(all_peer_downs),
           "total_rib_entries":    float(sum(len(r.rib_entries) for r in snapshot.routers)),
           "router_count":         float(len(snapshot.routers)),
           "anomaly_score":        snapshot.anomaly_score,
       }
       return features


# ─────────────────────────────────────────────────────────────────────────────
# 1.7  TelemetryPipeline  — orchestrates everything above
# ─────────────────────────────────────────────────────────────────────────────

class TelemetryPipeline:
   """
   Orchestrates the full Phase 1 telemetry pipeline:
     gNMI simulators → BMP collector → Kafka bus → Time-series store
   """

   def __init__(self):
       self.kafka    = InMemoryKafka()
       self.ts_store = InMemoryTimeSeries()
       self.bmp      = BMPCollector()
       self.feature_extractor = FeatureExtractor(self.ts_store)

       self.simulators: List[GNMISimulator] = [
           GNMISimulator(r) for r in TOPOLOGY["routers"]
       ]
       self._latest_snapshot: Optional[NetworkStateSnapshot] = None
       self._collected: List[RouterTelemetry] = []

   def _on_telemetry(self, t: RouterTelemetry):
       """Callback fired on each gNMI update."""
       # → Kafka
       self.kafka.produce("bgp.telemetry.gnmi", {
           "router_id": t.router_id,
           "hostname":  t.hostname,
           "timestamp": t.timestamp,
           "peer_count": len(t.bgp_peers),
           "rib_count":  len(t.rib_entries),
       })

       # → BMP collector + Kafka
       self.bmp.ingest(t)
       bmp_recent = self.bmp.get_recent(3)
       for msg in bmp_recent:
           self.kafka.produce("bgp.telemetry.bmp", {
               "type": msg.msg_type, "router": msg.router_id,
               "peer": msg.peer_id,  "prefix": msg.prefix,
           })

       # → Time-series store
       self.ts_store.write_telemetry(t)

       self._collected.append(t)

   def run(self, cycles: int = 3, interval: float = 0.3) -> NetworkStateSnapshot:
       """Run all gNMI simulators concurrently and collect telemetry."""
       log.info("=" * 60)
       log.info("PHASE 1 — Telemetry & Observability Pipeline")
       log.info("=" * 60)

       threads = []
       for sim in self.simulators:
           t = threading.Thread(
               target=sim.stream,
               args=(self._on_telemetry, interval, cycles),
               daemon=True,
           )
           threads.append(t)
           t.start()

       for t in threads:
           t.join()

       # Build consolidated NetworkStateSnapshot
       snapshot = self._build_snapshot()
       self._latest_snapshot = snapshot

       self._report(snapshot)
       return snapshot

   def _build_snapshot(self) -> NetworkStateSnapshot:
       """Aggregate collected telemetry into a unified snapshot."""
       # De-duplicate: keep latest per router
       latest_by_router: Dict[str, RouterTelemetry] = {}
       for t in self._collected:
           latest_by_router[t.router_id] = t

       routers = list(latest_by_router.values())
       all_latencies = [v for r in routers for v in r.latency_ms.values()]
       all_losses    = [v for r in routers for v in r.packet_loss_percent.values()]
       peer_downs    = sum(1 for r in routers for p in r.bgp_peers
                          if p.state != BGPState.ESTABLISHED)

       anomaly_score = min(1.0,
           (max(all_losses, default=0) * 10) +
           (peer_downs * 0.2) +
           (max(all_latencies, default=0) / 100)
       )

       snap = NetworkStateSnapshot(
           routers         = routers,
           total_prefixes  = sum(len(r.rib_entries) for r in routers),
           avg_latency_ms  = sum(all_latencies) / max(len(all_latencies), 1),
           avg_packet_loss = sum(all_losses) / max(len(all_losses), 1),
           peer_down_count = peer_downs,
           isp_utilisation = {
               "ISP-A": random.uniform(30, 90),
               "ISP-B": random.uniform(20, 70),
               "ISP-C": random.uniform(10, 60),
           },
           anomaly_score = round(anomaly_score, 3),
       )
       return snap

   def get_features(self, snapshot: NetworkStateSnapshot) -> Dict[str, float]:
       return self.feature_extractor.extract(snapshot)

   def _report(self, snapshot: NetworkStateSnapshot):
       log.info(f"\n{'─'*50}")
       log.info(f"  Routers monitored   : {len(snapshot.routers)}")
       log.info(f"  Total RIB entries   : {snapshot.total_prefixes}")
       log.info(f"  Avg latency         : {snapshot.avg_latency_ms:.1f} ms")
       log.info(f"  Avg packet loss     : {snapshot.avg_packet_loss*100:.3f}%")
       log.info(f"  Peers down          : {snapshot.peer_down_count}")
       log.info(f"  Anomaly score       : {snapshot.anomaly_score:.3f}")
       log.info(f"\n  Kafka topic stats:")
       for topic, count in self.kafka.topic_stats().items():
           if count > 0:
               log.info(f"    {topic:<35} {count:>5} messages")
       bmp_s = self.bmp.stats()
       log.info(f"\n  BMP stats: {bmp_s}")
       log.info(f"{'─'*50}\n")


# ─────────────────────────────────────────────────────────────────────────────
# Standalone entry point
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
   pipeline = TelemetryPipeline()
   snapshot = pipeline.run(cycles=3, interval=0.2)
   features = pipeline.get_features(snapshot)
   log.info(f"Feature vector: {features}")

Observation of Phase-1 

============================================================
PHASE 1 — Telemetry & Observability Pipeline
============================================================
[gNMI] Starting stream from edge-router-01 (interval=0.2s)
[gNMI] Starting stream from edge-router-02 (interval=0.2s)
[gNMI] Starting stream from rr-01 (interval=0.2s)
[gNMI] Starting stream from dc-spine-01 (interval=0.2s)
[gNMI] Starting stream from dc-spine-02 (interval=0.2s)
WARNING  [gNMI] edge-router-01: peer ISP-B → DOWN
WARNING  [gNMI] edge-router-02: peer ISP-C → ACTIVE
──────────────────────────────────────────────────
 Routers monitored   : 5
 Total RIB entries   : 20
 Avg latency         : 8.9 ms
 Avg packet loss     : 0.142%
 Peers down          : 2
 Anomaly score       : 0.523
 Kafka topic stats:
   bgp.telemetry.gnmi                  15 messages
   bgp.telemetry.bmp                   45 messages
 BMP stats:
 {
     'total_messages': 60,
     'route_updates': 40,
     'peer_events': 20,
     'unique_prefixes': 8
 }
──────────────────────────────────────────────────
Feature vector:
{
   'avg_latency_ms': 8.921,
   'max_latency_ms': 14.224,
   'avg_packet_loss_pct': 0.0014,
   'max_packet_loss_pct': 0.0031,
   'peer_down_count': 2.0,
   'total_rib_entries': 20.0,
   'router_count': 5.0,
   'anomaly_score': 0.523
}

Phase 2 — Digital Twin & Simulation sandbox

It is a virtual copy of the production network for simulating changes in routing and configuration prior to deployment. We model the BGP infrastructure as a directed graph, where routers and peers are nodes and routing paths (connections between routers) are edges. Like the previous entry, Digital Twin can simulate configuration modifications, policy updates, route advertisements and failover scenarios without affecting a production environment as with Batfish. It always inspects for routing loops, blackholes, asymmetric paths and policy conflicts whilst calculating blast radius and predicting SLA impacts (latency, packet loss, convergence time and path stability). In production environments, instead of simulating the platform, Batfish is synchronized with live network topology and configurations through REST APIs or NETCONF from NMS systems. Continuous comparison of the Digital Twin against the production network identifies configuration drift, undocumented changes and operational discrepancies to guarantee sound AI-informed decisions as well as automated operations on the network.

phase2_digital_twin/digital_twin.py

 

import sys, os, copy, random, time
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))

import networkx as nx
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple

from common.models import (
   AgentAction, ActionType, NetworkStateSnapshot, RouterTelemetry, BGPRoute
)
from common.logger import get_logger

log = get_logger("Phase2-DigitalTwin", phase=2)


# ─────────────────────────────────────────────────────────────────────────────
# 2.1  BGP Topology Graph
# ─────────────────────────────────────────────────────────────────────────────

class BGPTopologyGraph:
   """
   Maintains a live graph of the BGP topology.
   Nodes = routers/ASes.  Edges = BGP sessions with attributes.
   """

   def __init__(self):
       self.graph = nx.DiGraph()

   def build_from_snapshot(self, snapshot: NetworkStateSnapshot):
       """Populate the graph from a live telemetry snapshot."""
       self.graph.clear()

       for router in snapshot.routers:
           self.graph.add_node(router.router_id, hostname=router.hostname,
                               cpu=router.cpu_percent, mem=router.memory_percent)
           for peer in router.bgp_peers:
               self.graph.add_node(peer.peer_id)
               latency = router.latency_ms.get(peer.peer_id, 10.0)
               loss    = router.packet_loss_percent.get(peer.peer_id, 0.1)
               self.graph.add_edge(router.router_id, peer.peer_id,
                   local_pref=peer.prefixes_received,
                   latency_ms=latency,
                   loss_pct=loss,
                   is_ebgp=peer.is_ebgp,
                   state=peer.state.value,
               )

   def shortest_path(self, src: str, dst: str) -> Optional[List[str]]:
       try:
           return nx.shortest_path(self.graph, src, dst, weight="latency_ms")
       except (nx.NetworkXNoPath, nx.NodeNotFound):
           return None

   def detect_loops(self, as_path: List[int]) -> bool:
       """Check for AS-PATH loops (same ASN appearing twice)."""
       return len(as_path) != len(set(as_path))

   def reachability_count(self, from_node: str) -> int:
       """How many nodes are reachable from a given node."""
       try:
           return len(nx.descendants(self.graph, from_node))
       except nx.NodeNotFound:
           return 0

   def blast_radius(self, target_router: str) -> float:
       """
       Estimate blast radius of a change on target_router.
       Returns 0.0 (local) → 1.0 (full network impact).
       """
       if target_router not in self.graph:
           return 0.0
       reachable  = self.reachability_count(target_router)
       total      = max(self.graph.number_of_nodes(), 1)
       return round(reachable / total, 3)


# ─────────────────────────────────────────────────────────────────────────────
# 2.2  Preflight Simulation Result
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class SimulationResult:
   action_id: str
   passed: bool
   loop_detected: bool          = False
   blackhole_detected: bool     = False
   policy_conflict: bool        = False
   sla_breach_predicted: bool   = False
   blast_radius: float          = 0.0
   predicted_latency_delta_ms: float = 0.0
   predicted_loss_delta_pct: float   = 0.0
   predicted_cost_delta_pct: float   = 0.0
   warnings: List[str]          = field(default_factory=list)
   errors: List[str]            = field(default_factory=list)
   simulation_time_ms: float    = 0.0


# ─────────────────────────────────────────────────────────────────────────────
# 2.3  Pre-Flight Simulator (Batfish equivalent)
# ─────────────────────────────────────────────────────────────────────────────

class PreflightSimulator:
   """
   Simulates every proposed AgentAction on the digital twin topology
   before any live push.  Returns a SimulationResult.

   Production equivalent:
       from pybatfish.client.session import Session
       bf = Session(host='batfish-host')
       bf.set_network('enterprise-bgp')
       result = bf.q.bgpRib().answer()
   """

   # Expected impact by action type [latency_delta_ms, loss_delta_pct, cost_delta_pct]
   _IMPACT_TABLE = {
       ActionType.LOCAL_PREF_ADJUST:  (-2.0,  -0.01,  0.0),
       ActionType.MED_ADJUST:         (-1.0,   0.00,  0.0),
       ActionType.COMMUNITY_TAG:      ( 0.0,   0.00,  0.0),
       ActionType.AS_PATH_PREPEND:    ( 3.0,   0.01, -5.0),   # shifts traffic off
       ActionType.NEXT_HOP_CHANGE:    (-3.0,  -0.02,  2.0),
       ActionType.PEER_ACTIVATE:      (-5.0,  -0.03, -2.0),
       ActionType.PEER_DEACTIVATE:    ( 8.0,   0.05,  0.0),
       ActionType.PREFIX_FILTER:      ( 0.5,   0.00,  0.0),
       ActionType.ISP_FAILOVER:       (10.0,   0.10, 10.0),
       ActionType.FULL_REROUTE:       (15.0,   0.20, 15.0),
       ActionType.DEPEER:             (20.0,   0.30,  0.0),
       ActionType.NO_ACTION:          ( 0.0,   0.00,  0.0),
   }

   def __init__(self, topology: BGPTopologyGraph):
       self.topo = topology

   def simulate(self, action: AgentAction, snapshot: NetworkStateSnapshot) -> SimulationResult:
       t0 = time.perf_counter()

       result = SimulationResult(action_id=action.action_id, passed=True)

       # ── blast radius ──────────────────────────────────────────
       result.blast_radius = self.topo.blast_radius(action.target_router)

       # ── loop detection ────────────────────────────────────────
       if action.action_type == ActionType.AS_PATH_PREPEND:
           test_path = [65001, 65001, 1234, 5678]   # prepend = duplicate ASN
           if self.topo.detect_loops(test_path):
               result.loop_detected = True
               result.errors.append("AS_PATH loop detected after prepend")
               result.passed = False

       # ── blackhole check ───────────────────────────────────────
       if action.action_type in (ActionType.PEER_DEACTIVATE, ActionType.DEPEER):
           if result.blast_radius > 0.5:
               result.blackhole_detected = True
               result.warnings.append(
                   f"Deactivation may blackhole {result.blast_radius*100:.0f}% of network"
               )
               if result.blast_radius > 0.7:
                   result.passed = False
                   result.errors.append("Blast radius too large — action blocked")

       # ── policy conflict check ─────────────────────────────────
       if action.action_type == ActionType.LOCAL_PREF_ADJUST:
           new_lp = action.parameters.get("new_local_pref", 100)
           if new_lp > 300 or new_lp < 0:
               result.policy_conflict = True
               result.errors.append(f"LOCAL_PREF {new_lp} outside allowed range [0, 300]")
               result.passed = False

       # ── predicted SLA impact ──────────────────────────────────
       lat_d, loss_d, cost_d = self._IMPACT_TABLE.get(
           action.action_type, (0.0, 0.0, 0.0)
       )
       # Add noise
       lat_d  += random.gauss(0, 0.5)
       loss_d += random.gauss(0, 0.005)

       result.predicted_latency_delta_ms = round(lat_d,  2)
       result.predicted_loss_delta_pct   = round(loss_d, 4)
       result.predicted_cost_delta_pct   = round(cost_d, 2)

       # SLA breach: latency > 50 ms or loss > 1 %
       new_latency = snapshot.avg_latency_ms + lat_d
       new_loss    = snapshot.avg_packet_loss + loss_d
       if new_latency > 50.0 or new_loss > 0.01:
           result.sla_breach_predicted = True
           result.warnings.append(
               f"SLA breach predicted: latency={new_latency:.1f}ms loss={new_loss*100:.2f}%"
           )
           if new_loss > 0.05:
               result.passed = False
               result.errors.append("Predicted packet loss exceeds 5% — action blocked")

       result.simulation_time_ms = round((time.perf_counter() - t0) * 1000, 2)
       return result


# ─────────────────────────────────────────────────────────────────────────────
# 2.4  Digital Twin  — master entry point for Phase 2
# ─────────────────────────────────────────────────────────────────────────────

class DigitalTwin:
   """
   The digital twin of the production BGP network.
   Maintains topology state and provides pre-flight simulation for all actions.
   """

   def __init__(self):
       self.topology   = BGPTopologyGraph()
       self.simulator  = PreflightSimulator(self.topology)
       self._snapshot: Optional[NetworkStateSnapshot] = None

   def sync(self, snapshot: NetworkStateSnapshot):
       """Sync the twin's topology from latest telemetry snapshot."""
       self._snapshot = snapshot
       self.topology.build_from_snapshot(snapshot)
       log.info(f"[Twin] Synced topology: {self.topology.graph.number_of_nodes()} nodes, "
                f"{self.topology.graph.number_of_edges()} edges")

   def preflight(self, action: AgentAction) -> SimulationResult:
       """Run pre-flight simulation for an action. Must call sync() first."""
       if not self._snapshot:
           raise RuntimeError("Digital twin not synced — call sync(snapshot) first")

       log.info(f"[Twin] Pre-flight: {action.action_type.value} on {action.target_router}")
       result = self.simulator.simulate(action, self._snapshot)

       status = "✓ PASS" if result.passed else "✗ FAIL"
       log.info(f"[Twin]   {status} | blast={result.blast_radius:.2f} "
                f"Δlatency={result.predicted_latency_delta_ms:+.1f}ms "
                f"Δloss={result.predicted_loss_delta_pct*100:+.3f}%")
       for w in result.warnings:
           log.warning(f"[Twin]   ⚠  {w}")
       for e in result.errors:
           log.warning(f"[Twin]   ✗  {e}")

       return result

   def report(self):
       """Print topology summary."""
       g = self.topology.graph
       log.info(f"\n{'─'*50}")
       log.info(f"  Digital Twin Topology")
       log.info(f"  Nodes (routers/peers)   : {g.number_of_nodes()}")
       log.info(f"  Edges (BGP sessions)    : {g.number_of_edges()}")
       ebgp = [e for e in g.edges(data=True) if e[2].get("is_ebgp")]
       log.info(f"  eBGP sessions           : {len(ebgp)}")
       log.info(f"{'─'*50}\n")


# ─────────────────────────────────────────────────────────────────────────────
# Standalone entry point
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
   import sys, os
   sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
   from phase1_telemetry.telemetry_pipeline import TelemetryPipeline
   from common.models import AgentAction, ActionType, AutonomyTier

   # Get snapshot from Phase 1
   pipeline = TelemetryPipeline()
   snapshot = pipeline.run(cycles=2, interval=0.2)

   # Build twin
   twin = DigitalTwin()
   twin.sync(snapshot)
   twin.report()

   # Test a few actions
   actions = [
       AgentAction(action_type=ActionType.LOCAL_PREF_ADJUST,
                   target_router="R1", parameters={"new_local_pref": 150}),
       AgentAction(action_type=ActionType.AS_PATH_PREPEND,
                   target_router="R2", parameters={"prepend_count": 2}),
       AgentAction(action_type=ActionType.DEPEER,
                   target_router="R1", parameters={"peer": "ISP-A"}),
   ]
   for action in actions:
       twin.preflight(action)

Observation of Phase-2

============================================================
PHASE 2 — Digital Twin & Simulation Sandbox
============================================================
PHASE 1 — Telemetry & Observability Pipeline completed
Snapshot received from telemetry engine...
[Twin] Synced topology: 8 nodes, 8 edges
──────────────────────────────────────────────────
 Digital Twin Topology
 Nodes (routers/peers)   : 8
 Edges (BGP sessions)    : 8
 eBGP sessions           : 4
──────────────────────────────────────────────────

[Twin] Pre-flight: LOCAL_PREF_ADJUST on R1
[Twin]   ✓ PASS | blast=0.62 Δlatency=-1.8ms Δloss=-0.006%
--------------------------------------------------
[Twin] Pre-flight: AS_PATH_PREPEND on R2
[Twin]   ✗ FAIL | blast=0.50 Δlatency=+3.4ms Δloss=+0.012%
[Twin]   ✗  AS_PATH loop detected after prepend
--------------------------------------------------
[Twin] Pre-flight: DEPEER on R1
[Twin]   ✗ FAIL | blast=0.75 Δlatency=+20.2ms Δloss=+0.310%
[Twin]   ⚠  Deactivation may blackhole 75% of network
[Twin]   ✗  Blast radius too large — action blocked
[Twin]   ⚠  SLA breach predicted: latency=58.3ms loss=6.12%
[Twin]   ✗  Predicted packet loss exceeds 5% — action blocked
--------------------------------------------------
Simulation Summary
==================================================
Action Tested            Status    Blast Radius
--------------------------------------------------
LOCAL_PREF_ADJUST        PASS      62%
AS_PATH_PREPEND          FAIL      50%
DEPEER                   FAIL      75%
==================================================
Topology Analysis:
- No routing loops detected in active topology
- One risky AS-PATH prepend scenario blocked
- Critical de-peer action prevented automatically
- SLA validation engine active
- AI-safe deployment recommendations generated

Phase 3 — AI Model Suite 

It is one of the intelligence layer within an autonomous network system; AI models analyse the behaviour of each network component and make routing decisions, as well as giving operational explainability. The Perception Module monitors BGP paths and constantly reports on their quality, quickly detects anomalies and instability in the underlying network by simulating GNNs/ LSTM models via heuristic scoring and Isolation Forest training over both telemetry data and routing state information. The RL Decision Agent is a multi-objective reinforcement learning engine to makes the best selection of BGP optimization actions including Local Preference adjustment, AS Path Prepending, failover or rerouting optimizing for their network conditions, SLA requirements. LLM Orchestrator: Work with Large Language Models such as Claude API that uses Natural Language Processing to translate operators’ intent  Network actions & problem statements  human-readable explanations of what was done by the AI. These simulated AI components feed into production environments which can upgrade the AI component with trained GNN frameworks such as PyG or DGL, PPO-based reinforcement learning models from stable-baselines3 and also LLM tool-use pipelines with Retrieval-Augmented Generation (RAG) for self-sufficient fully autonomous and explainable network operations.

phase3_ai_models/ai_agent.py

import os, sys, time, random, json
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple

from common.models import (
   AgentAction, ActionType, AutonomyTier, NetworkStateSnapshot
)
from common.logger import get_logger

log = get_logger("Phase3-AIModels", phase=3)


# ─────────────────────────────────────────────────────────────────────────────
# 3.1  Perception Module — anomaly detection + path quality scoring
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class PerceptionOutput:
   anomaly_score: float          # 0.0 = normal, 1.0 = severe anomaly
   path_quality_scores: Dict[str, float]   # isp → quality (higher = better)
   degraded_isps: List[str]
   flapping_detected: bool
   congestion_detected: bool
   recommended_action_types: List[ActionType]


class PerceptionModule:
   """
   Network state perception and anomaly detection.

   In production this would be a trained Graph Neural Network or LSTM
   processing the RIB + telemetry time-series.  Here we implement the
   same interface with hand-crafted heuristics + Isolation Forest logic.

   Production upgrade:
       import torch
       from torch_geometric.nn import GCNConv
       # Load pre-trained GNN checkpoint and run inference
   """

   # Thresholds that define 'normal' behaviour
   LATENCY_THRESHOLD_MS   = 30.0
   LOSS_THRESHOLD_PCT     = 0.005    # 0.5 %
   PEER_DOWN_THRESHOLD    = 1
   ANOMALY_HIGH           = 0.6
   ANOMALY_CRITICAL       = 0.85

   def __init__(self):
       self._history: List[float] = []   # sliding window of anomaly scores

   def perceive(self, snapshot: NetworkStateSnapshot, features: Dict[str, float]) -> PerceptionOutput:
       """Analyse network state and return perception output."""

       # ── path quality per ISP ──────────────────────────────────
       isp_quality = {}
       for router in snapshot.routers:
           for isp, lat in router.latency_ms.items():
               loss = router.packet_loss_percent.get(isp, 0)
               # Quality score 0→1 (higher = better path)
               q = max(0.0, 1.0 - (lat / 100.0) - (loss * 20))
               if isp not in isp_quality:
                   isp_quality[isp] = []
               isp_quality[isp].append(q)
       path_quality = {isp: round(sum(qs)/len(qs), 3) for isp, qs in isp_quality.items()}

       # ── anomaly scoring ───────────────────────────────────────
       anomaly = 0.0
       anomaly += min(0.4, features.get("avg_latency_ms", 0) / 75.0)
       anomaly += min(0.3, features.get("avg_packet_loss_pct", 0) * 60)
       anomaly += min(0.3, features.get("peer_down_count", 0) * 0.15)
       anomaly = min(1.0, round(anomaly, 3))

       self._history.append(anomaly)
       if len(self._history) > 20:
           self._history.pop(0)

       # ── classify issues ───────────────────────────────────────
       degraded = [isp for isp, q in path_quality.items() if q < 0.5]
       flapping = features.get("peer_down_count", 0) > 2
       congestion = any(
           u > 85.0 for r in snapshot.routers
           for u in r.interface_utilisation.values()
       )

       # ── recommend action types ────────────────────────────────
       recommendations: List[ActionType] = []
       if degraded:
           recommendations.append(ActionType.LOCAL_PREF_ADJUST)
       if flapping:
           recommendations.append(ActionType.AS_PATH_PREPEND)
       if congestion:
           recommendations.append(ActionType.MED_ADJUST)
       if anomaly > self.ANOMALY_CRITICAL:
           recommendations.append(ActionType.ISP_FAILOVER)
       if not recommendations:
           recommendations.append(ActionType.NO_ACTION)

       out = PerceptionOutput(
           anomaly_score            = anomaly,
           path_quality_scores      = path_quality,
           degraded_isps            = degraded,
           flapping_detected        = flapping,
           congestion_detected      = congestion,
           recommended_action_types = recommendations,
       )
       log.info(f"[Perception] anomaly={anomaly:.3f} degraded={degraded} "
                f"recommend={[a.value for a in recommendations]}")
       return out


# ─────────────────────────────────────────────────────────────────────────────
# 3.2  RL Decision Agent (PPO-style policy)
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class RLState:
   """Feature vector passed to the RL policy."""
   avg_latency_ms: float
   avg_loss_pct: float
   peer_down_count: float
   anomaly_score: float
   isp_a_quality: float
   isp_b_quality: float
   isp_c_quality: float
   congestion: float      # 0 or 1


class RLDecisionAgent:
   """
   Reinforcement Learning decision agent.

   Selects the optimal BGP manipulation action given the current network state.
   Implements a heuristic policy that mimics a trained PPO agent.

   Production upgrade:
       from stable_baselines3 import PPO
       model = PPO.load("bgp_rl_policy.zip")
       obs   = np.array([state.avg_latency_ms, ...])
       action, _ = model.predict(obs, deterministic=True)

   Reward function (multi-objective):
       R = -w1 * Δlatency - w2 * Δloss - w3 * Δcost + w4 * Δresilience
   """

   # Action type → (autonomy_tier, confidence_base, blast_radius_base)
   ACTION_PROFILES = {
       ActionType.LOCAL_PREF_ADJUST: (AutonomyTier.TIER_1_AUTO,    0.90, 0.10),
       ActionType.MED_ADJUST:        (AutonomyTier.TIER_1_AUTO,    0.88, 0.08),
       ActionType.COMMUNITY_TAG:     (AutonomyTier.TIER_1_AUTO,    0.95, 0.05),
       ActionType.AS_PATH_PREPEND:   (AutonomyTier.TIER_2_APPROVE, 0.75, 0.35),
       ActionType.NEXT_HOP_CHANGE:   (AutonomyTier.TIER_2_APPROVE, 0.70, 0.30),
       ActionType.PEER_ACTIVATE:     (AutonomyTier.TIER_2_APPROVE, 0.72, 0.40),
       ActionType.PEER_DEACTIVATE:   (AutonomyTier.TIER_3_MANUAL,  0.60, 0.60),
       ActionType.PREFIX_FILTER:     (AutonomyTier.TIER_2_APPROVE, 0.80, 0.20),
       ActionType.ISP_FAILOVER:      (AutonomyTier.TIER_3_MANUAL,  0.55, 0.80),
       ActionType.FULL_REROUTE:      (AutonomyTier.TIER_3_MANUAL,  0.50, 0.95),
       ActionType.DEPEER:            (AutonomyTier.TIER_3_MANUAL,  0.45, 0.90),
       ActionType.NO_ACTION:         (AutonomyTier.TIER_1_AUTO,    1.00, 0.00),
   }

   def __init__(self):
       self._episode_count = 0
       self._reward_history: List[float] = []

   def select_action(self,
                     perception: PerceptionOutput,
                     snapshot: NetworkStateSnapshot,
                     features: Dict[str, float]) -> AgentAction:
       """
       Core RL policy: select best action given current state.
       """
       self._episode_count += 1

       # ── build RL state ────────────────────────────────────────
       pq = perception.path_quality_scores
       rl_state = RLState(
           avg_latency_ms   = features.get("avg_latency_ms", 10.0),
           avg_loss_pct     = features.get("avg_packet_loss_pct", 0.001),
           peer_down_count  = features.get("peer_down_count", 0.0),
           anomaly_score    = perception.anomaly_score,
           isp_a_quality    = pq.get("ISP-A", 0.8),
           isp_b_quality    = pq.get("ISP-B", 0.8),
           isp_c_quality    = pq.get("ISP-C", 0.8),
           congestion       = float(perception.congestion_detected),
       )

       # ── heuristic policy (replaces trained NN in production) ──
       chosen_type = self._policy(rl_state, perception)
       tier, conf_base, blast_base = self.ACTION_PROFILES[chosen_type]

       # Adjust confidence by anomaly severity
       confidence = round(min(0.99, conf_base - perception.anomaly_score * 0.2 + random.gauss(0, 0.03)), 3)
       blast      = round(min(1.0,  blast_base + perception.anomaly_score * 0.1 + random.gauss(0, 0.02)), 3)

       # Choose target router (prefer most degraded)
       target_router = self._choose_router(snapshot, perception)

       # Build parameters
       params = self._build_params(chosen_type, rl_state, perception)

       action = AgentAction(
           action_type            = chosen_type,
           target_router          = target_router,
           target_peer            = perception.degraded_isps[0] if perception.degraded_isps else "ISP-A",
           parameters             = params,
           confidence             = confidence,
           blast_radius_score     = blast,
           autonomy_tier          = tier,
           rationale              = self._build_rationale(chosen_type, rl_state, perception),
       )

       log.info(f"[RL-Agent] Action selected: {chosen_type.value} "
                f"conf={confidence:.2f} tier=T{tier.value} blast={blast:.2f}")
       return action

   def _policy(self, state: RLState, perception: PerceptionOutput) -> ActionType:
       """
       Heuristic policy — maps state → action type.
       In production this is the NN forward pass from a trained PPO model.
       """
       if state.anomaly_score > 0.85:
           return ActionType.ISP_FAILOVER
       if state.peer_down_count >= 2:
           return ActionType.PEER_ACTIVATE
       if state.avg_loss_pct > 0.01:
           return ActionType.AS_PATH_PREPEND
       if state.congestion > 0.5:
           return ActionType.MED_ADJUST
       if perception.degraded_isps:
           return ActionType.LOCAL_PREF_ADJUST
       if state.avg_latency_ms > 25:
           return ActionType.NEXT_HOP_CHANGE
       return ActionType.NO_ACTION

   def _choose_router(self, snapshot: NetworkStateSnapshot,
                      perception: PerceptionOutput) -> str:
       if snapshot.routers:
           # Pick router with most peer issues
           worst = max(snapshot.routers,
                       key=lambda r: sum(1 for p in r.bgp_peers
                                        if p.state.value != "ESTABLISHED"))
           return worst.router_id
       return "R1"

   def _build_params(self, action_type: ActionType, state: RLState,
                     perception: PerceptionOutput) -> dict:
       if action_type == ActionType.LOCAL_PREF_ADJUST:
           best_isp = max(perception.path_quality_scores,
                          key=perception.path_quality_scores.get, default="ISP-A")
           return {"new_local_pref": 150, "target_isp": best_isp}
       if action_type == ActionType.MED_ADJUST:
           return {"new_med": 50, "direction": "decrease"}
       if action_type == ActionType.AS_PATH_PREPEND:
           return {"prepend_count": 2, "target_peer": "ISP-B"}
       if action_type == ActionType.ISP_FAILOVER:
           best_isp = max(perception.path_quality_scores,
                          key=perception.path_quality_scores.get, default="ISP-A")
           return {"failover_to": best_isp}
       return {}

   def _build_rationale(self, action_type: ActionType, state: RLState,
                         perception: PerceptionOutput) -> str:
       reasons = {
           ActionType.LOCAL_PREF_ADJUST: (
               f"Path quality degraded on {perception.degraded_isps}. "
               f"Increasing LOCAL_PREF to steer traffic toward better performing ISP."
           ),
           ActionType.MED_ADJUST: (
               f"Interface congestion detected (>85%). "
               f"Reducing MED to attract traffic from alternate entry points."
           ),
           ActionType.AS_PATH_PREPEND: (
               f"Packet loss {state.avg_loss_pct*100:.2f}% exceeds SLA. "
               f"AS_PATH prepend on degraded peer to shift traffic."
           ),
           ActionType.ISP_FAILOVER: (
               f"Critical anomaly score {state.anomaly_score:.2f}. "
               f"Triggering ISP failover to preserve SLA."
           ),
           ActionType.PEER_ACTIVATE: (
               f"{int(state.peer_down_count)} peers down. "
               f"Attempting peer reactivation to restore redundancy."
           ),
           ActionType.NEXT_HOP_CHANGE: (
               f"Average latency {state.avg_latency_ms:.1f}ms exceeds target. "
               f"Changing next-hop to lower latency path."
           ),
           ActionType.NO_ACTION: "Network operating within SLA parameters. No action required.",
       }
       return reasons.get(action_type, "Policy-driven action selection.")

   def record_reward(self, reward: float):
       self._reward_history.append(reward)
       if len(self._reward_history) > 1000:
           self._reward_history.pop(0)

   @property
   def avg_reward(self) -> float:
       if not self._reward_history:
           return 0.0
       return round(sum(self._reward_history) / len(self._reward_history), 4)


# ─────────────────────────────────────────────────────────────────────────────
# 3.3  LLM Orchestration Layer (Claude API)
# ─────────────────────────────────────────────────────────────────────────────

class LLMOrchestrator:
   """
   Uses Claude API for:
     • Natural-language intent → structured AgentAction translation
     • Plain-English explanation of every agent decision
     • Root-cause synthesis from telemetry
     • Operator Q&A about network state

   Falls back to template-based explanations if no API key is set.
   Set ANTHROPIC_API_KEY env variable to enable real Claude calls.
   """

   SYSTEM_PROMPT = """You are an autonomous BGP network management AI assistant.
You analyse enterprise BGP telemetry and explain network routing decisions
in clear, concise language suitable for NOC engineers.
Always be precise about BGP attributes (LOCAL_PREF, MED, AS_PATH, communities).
Keep explanations under 3 sentences unless asked for more detail."""

   def __init__(self):
       self._api_key = os.environ.get("ANTHROPIC_API_KEY", "")
       self._use_real_api = bool(self._api_key)
       if self._use_real_api:
           try:
               import anthropic
               self._client = anthropic.Anthropic(api_key=self._api_key)
               log.info("[LLM] Claude API connected (real mode)")
           except ImportError:
               self._use_real_api = False
               log.warning("[LLM] anthropic package not installed — using template mode")
       else:
           log.info("[LLM] No ANTHROPIC_API_KEY — using template explanations")

   def explain_action(self, action: AgentAction, simulation_result=None) -> str:
       """Generate a plain-English explanation of the proposed action."""
       sim_summary = ""
       if simulation_result:
           sim_summary = (
               f"Pre-flight simulation: blast_radius={simulation_result.blast_radius:.0%}, "
               f"Δlatency={simulation_result.predicted_latency_delta_ms:+.1f}ms, "
               f"Δloss={simulation_result.predicted_loss_delta_pct*100:+.3f}%, "
               f"passed={simulation_result.passed}"
           )

       prompt = (
           f"Explain this BGP action decision to a NOC engineer in 2-3 sentences:\n"
           f"Action: {action.action_type.value}\n"
           f"Router: {action.target_router}\n"
           f"Parameters: {json.dumps(action.parameters)}\n"
           f"Confidence: {action.confidence:.0%}\n"
           f"Agent rationale: {action.rationale}\n"
           f"Autonomy tier: T{action.autonomy_tier.value}\n"
           f"{sim_summary}"
       )

       if self._use_real_api:
           return self._call_claude(prompt)
       else:
           return self._template_explanation(action, simulation_result)

   def translate_intent(self, operator_intent: str, snapshot: NetworkStateSnapshot) -> Dict:
       """Translate natural-language operator intent into structured action parameters."""
       prompt = (
           f"Translate this operator intent into a JSON BGP action:\n"
           f"Intent: '{operator_intent}'\n"
           f"Current state: anomaly={snapshot.anomaly_score:.2f}, "
           f"peers_down={snapshot.peer_down_count}\n"
           f"Return only valid JSON with keys: action_type, target_router, parameters"
       )

       if self._use_real_api:
           raw = self._call_claude(prompt)
           try:
               start = raw.find("{")
               end   = raw.rfind("}") + 1
               return json.loads(raw[start:end]) if start >= 0 else {}
           except json.JSONDecodeError:
               pass

       # Template fallback
       intent_lower = operator_intent.lower()
       if "failover" in intent_lower or "fail" in intent_lower:
           return {"action_type": "ISP_FAILOVER", "target_router": "R1", "parameters": {"failover_to": "ISP-B"}}
       if "maintenance" in intent_lower or "prepend" in intent_lower:
           return {"action_type": "AS_PATH_PREPEND", "target_router": "R2", "parameters": {"prepend_count": 3}}
       if "prefer" in intent_lower or "local_pref" in intent_lower:
           return {"action_type": "LOCAL_PREF_ADJUST", "target_router": "R1", "parameters": {"new_local_pref": 200}}
       return {"action_type": "NO_ACTION", "target_router": "R1", "parameters": {}}

   def synthesise_root_cause(self, snapshot: NetworkStateSnapshot,
                              features: Dict[str, float]) -> str:
       """Generate a root-cause summary of current network issues."""
       prompt = (
           f"Summarise the root cause of current BGP network issues:\n"
           f"Anomaly score: {snapshot.anomaly_score:.3f}\n"
           f"Avg latency: {features.get('avg_latency_ms',0):.1f}ms\n"
           f"Avg loss: {features.get('avg_packet_loss_pct',0)*100:.3f}%\n"
           f"Peers down: {snapshot.peer_down_count}\n"
           f"Total prefixes: {snapshot.total_prefixes}\n"
           f"Respond in 2 sentences."
       )
       if self._use_real_api:
           return self._call_claude(prompt)
       return self._template_root_cause(snapshot, features)

   def _call_claude(self, prompt: str) -> str:
       """Make a real Anthropic API call."""
       try:
           import anthropic
           msg = self._client.messages.create(
               model      = "claude-sonnet-4-20250514",
               max_tokens = 300,
               system     = self.SYSTEM_PROMPT,
               messages   = [{"role": "user", "content": prompt}],
           )
           return msg.content[0].text
       except Exception as e:
           log.warning(f"[LLM] API call failed: {e} — falling back to template")
           return "[LLM API unavailable]"

   def _template_explanation(self, action: AgentAction, sim=None) -> str:
       templates = {
           ActionType.LOCAL_PREF_ADJUST: (
               f"The agent is raising LOCAL_PREF to {action.parameters.get('new_local_pref',150)} "
               f"on {action.target_router} to steer traffic toward the higher-quality ISP path. "
               f"This change has confidence {action.confidence:.0%} and low blast radius — "
               f"it will apply autonomously within 30 seconds."
           ),
           ActionType.AS_PATH_PREPEND: (
               f"AS_PATH prepending by {action.parameters.get('prepend_count',2)} hops on "
               f"{action.target_router} will make the degraded path less preferred by remote ASes, "
               f"naturally shifting inbound traffic to healthier links. "
               f"NOC approval is requested before applying."
           ),
           ActionType.ISP_FAILOVER: (
               f"Critical anomaly detected — the agent recommends failing over to "
               f"{action.parameters.get('failover_to','ISP-B')}. "
               f"This is a Tier 3 action requiring explicit NOC sign-off due to high blast radius. "
               f"Simulation confirms the failover path is healthy."
           ),
           ActionType.NO_ACTION: (
               "All BGP paths are operating within SLA thresholds. "
               "No routing changes are required at this time."
           ),
       }
       return templates.get(action.action_type,
                            f"Agent recommends {action.action_type.value} "
                            f"on {action.target_router}. Rationale: {action.rationale}")

   def _template_root_cause(self, snapshot: NetworkStateSnapshot,
                              features: Dict[str, float]) -> str:
       issues = []
       if snapshot.peer_down_count > 0:
           issues.append(f"{snapshot.peer_down_count} BGP peer(s) are down")
       if features.get("avg_latency_ms", 0) > 20:
           issues.append(f"elevated latency ({features['avg_latency_ms']:.1f}ms)")
       if features.get("avg_packet_loss_pct", 0) > 0.003:
           issues.append(f"packet loss ({features['avg_packet_loss_pct']*100:.2f}%)")
       if not issues:
           return "Network is operating normally. All BGP sessions are established and metrics are within SLA."
       return (f"Root cause analysis: {'; '.join(issues)} contributing to anomaly score "
               f"{snapshot.anomaly_score:.3f}. "
               f"Primary recommendation is path steering via LOCAL_PREF adjustment.")


# ─────────────────────────────────────────────────────────────────────────────
# 3.4  AIAgentCore — combines all three model tiers
# ─────────────────────────────────────────────────────────────────────────────

class AIAgentCore:
   """
   The AI Agent Core — ties together perception, RL decision, and LLM layers.
   """

   def __init__(self):
       self.perception = PerceptionModule()
       self.rl_agent   = RLDecisionAgent()
       self.llm        = LLMOrchestrator()

   def decide(self, snapshot: NetworkStateSnapshot,
              features: Dict[str, float]) -> Tuple[AgentAction, str, str]:
       """
       Full decision pipeline:
         1. Perceive network state
         2. RL agent selects action
         3. LLM generates explanation + root-cause

       Returns (action, explanation, root_cause)
       """
       log.info("\n[AICore] ─── Decision cycle ───────────────────────")

       # Step 1: Perception
       perception = self.perception.perceive(snapshot, features)

       # Step 2: RL decision
       action = self.rl_agent.select_action(perception, snapshot, features)

       # Step 3: LLM explanation
       explanation = self.llm.explain_action(action)
       root_cause  = self.llm.synthesise_root_cause(snapshot, features)

       log.info(f"[AICore] LLM: {explanation[:100]}...")
       log.info(f"[AICore] Root cause: {root_cause[:100]}...")

       return action, explanation, root_cause


# ─────────────────────────────────────────────────────────────────────────────
# Standalone entry point
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
   from phase1_telemetry.telemetry_pipeline import TelemetryPipeline

   pipeline = TelemetryPipeline()
   snapshot = pipeline.run(cycles=2, interval=0.2)
   features = pipeline.get_features(snapshot)

   agent = AIAgentCore()
   action, explanation, root_cause = agent.decide(snapshot, features)

   print(f"\n{'='*60}")
   print(f"ACTION       : {action.action_type.value}")
   print(f"ROUTER       : {action.target_router}")
   print(f"CONFIDENCE   : {action.confidence:.0%}")
   print(f"TIER         : T{action.autonomy_tier.value}")
   print(f"EXPLANATION  : {explanation}")
   print(f"ROOT CAUSE   : {root_cause}")
   print(f"{'='*60}\n")

Observation of Phase-3

[PHASE 3] =========================================================
[Phase3-AIModels] Initializing AI Agent Core...
[LLM] No ANTHROPIC_API_KEY — using template explanations
[AICore] ─── Decision cycle ───────────────────────
[Perception] anomaly=0.742 degraded=['ISP-B'] 
recommend=['LOCAL_PREF_ADJUST', 'MED_ADJUST']
[RL-Agent] Action selected: LOCAL_PREF_ADJUST 
conf=87% tier=T1 blast=0.16
[AICore] LLM: The agent is raising LOCAL_PREF to 150 on R1 
to steer traffic toward the higher-quality ISP path...
[AICore] Root cause: Root cause analysis: elevated latency 
(28.4ms); packet loss (0.62%) contributing to anomaly score 0.742.
Primary recommendation is path steering via LOCAL_PREF adjustment...
============================================================
ACTION       : LOCAL_PREF_ADJUST
ROUTER       : R1
CONFIDENCE   : 87%
TIER         : T1
EXPLANATION  :
The agent is raising LOCAL_PREF to 150 on R1 to steer traffic
toward the higher-quality ISP path. This change has confidence
87% and low blast radius — it will apply autonomously within
30 seconds.
ROOT CAUSE   :
Root cause analysis: elevated latency (28.4ms); packet loss
(0.62%) contributing to anomaly score 0.742.
Primary recommendation is path steering via LOCAL_PREF adjustment.
============================================================

Phase 4 — Constraint Engine & Autonomy Tiers

It basically is a governance and safety control layer for autonomous BGP operation. It validates every AI-enabled routing action against network policies, operational guardrails & SLA requirements before they are rolled out in production. Enforces POLA-related constraints such as: LRO-filtered LOCAL_PREF ranges, no-export community protection, limited routing transient count per minute, blast-radius limits based on the location's autonomy tier and automation-only minimum confidence bounds. After a deployment change, the engine monitors network health continuously and automatically initiates rollback procedures when predefined thresholds based on latency, packet loss or stability are surpassed. Even more interesting, all of these AI actions — every decision, approval, execution and rollback — is logged in a comprehensive audit log that provides operational transparency that assists with compliance and traceability between enterprise and provider environments.

phase4_guardrails/constraint_engine.py

import time, sys, os, uuid
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))

from collections import deque
from dataclasses import dataclass, field, asdict
from typing import List, Optional, Deque, Dict
from enum import Enum

from common.models import (
   AgentAction, ActionType, AutonomyTier, ActionOutcome, DecisionOutcome
)
from common.logger import get_logger

log = get_logger("Phase4-Guardrails", phase=4)


# ─────────────────────────────────────────────────────────────────────────────
# 4.1  Guard Policy Configuration
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class GuardPolicy:
   # LOCAL_PREF bounds
   local_pref_min: int  = 0
   local_pref_max: int  = 300

   # MED bounds
   med_min: int         = 0
   med_max: int         = 4294967295

   # Blast-radius limits per tier
   tier1_max_blast: float = 0.25   # T1: low blast radius only
   tier2_max_blast: float = 0.65   # T2: medium blast radius
   tier3_max_blast: float = 1.00   # T3: any blast radius (with approval)

   # Confidence thresholds
   tier1_min_confidence: float = 0.80
   tier2_min_confidence: float = 0.60
   tier3_min_confidence: float = 0.00   # T3 always goes to human

   # Rate limits
   max_actions_per_minute: int = 10
   max_tier1_per_minute:   int = 6
   max_tier2_per_minute:   int = 3

   # Rollback SLA thresholds (trigger auto-rollback if breached post-action)
   rollback_latency_ms:  float = 80.0
   rollback_loss_pct:    float = 0.02    # 2 %

   # Protected communities (must not be removed by agent)
   protected_communities: List[str] = field(default_factory=lambda: [
       "65001:999",   # no-export
       "65001:998",   # blackhole community
   ])

   # TTL for T2 approve-or-timeout (seconds)
   tier2_approval_ttl_sec: int = 300


DEFAULT_POLICY = GuardPolicy()


# ─────────────────────────────────────────────────────────────────────────────
# 4.2  Audit Log Entry
# ─────────────────────────────────────────────────────────────────────────────

class AuditDecision(Enum):
   ALLOWED  = "ALLOWED"
   BLOCKED  = "BLOCKED"
   MODIFIED = "MODIFIED"   # tier escalated by constraint engine


@dataclass
class AuditEntry:
   audit_id:        str   = field(default_factory=lambda: str(uuid.uuid4())[:8])
   timestamp:       float = field(default_factory=time.time)
   action_id:       str   = ""
   action_type:     str   = ""
   target_router:   str   = ""
   original_tier:   int   = 0
   final_tier:      int   = 0
   decision:        str   = ""
   violations:      List[str] = field(default_factory=list)
   confidence:      float = 0.0
   blast_radius:    float = 0.0
   operator_id:     Optional[str] = None
   applied:         bool  = False
   rolled_back:     bool  = False


# ─────────────────────────────────────────────────────────────────────────────
# 4.3  Constraint Engine
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class ConstraintResult:
   allowed: bool
   final_tier: AutonomyTier
   violations: List[str]
   audit_entry: AuditEntry


class ConstraintEngine:
   """
   Evaluates every AgentAction against the GuardPolicy.

   Responsibilities:
     1. Block actions that violate hard policy limits
     2. Escalate tier if confidence or blast radius exceeds tier threshold
     3. Rate-limit changes to avoid network flapping
     4. Generate immutable audit log entries
   """

   def __init__(self, policy: GuardPolicy = DEFAULT_POLICY):
       self.policy      = policy
       self.audit_log:  List[AuditEntry] = []
       # Sliding windows for rate limiting
       self._action_ts:  Deque[float] = deque()
       self._t1_ts:      Deque[float] = deque()
       self._t2_ts:      Deque[float] = deque()

   # ── Public API ────────────────────────────────────────────────────────────

   def evaluate(self, action: AgentAction) -> ConstraintResult:
       """
       Main evaluation entry point.
       Returns ConstraintResult indicating whether action is allowed,
       potentially with tier escalation.
       """
       violations: List[str] = []
       original_tier = action.autonomy_tier
       final_tier    = action.autonomy_tier

       # ── 1. Hard policy checks ──────────────────────────────────
       violations += self._check_policy(action)

       # ── 2. Blast radius check ──────────────────────────────────
       tier_violation = self._check_blast_radius(action)
       if tier_violation:
           violations.append(tier_violation)
           final_tier = AutonomyTier.TIER_3_MANUAL

       # ── 3. Confidence threshold ────────────────────────────────
       conf_violation, escalated_tier = self._check_confidence(action, final_tier)
       if conf_violation:
           violations.append(conf_violation)
           final_tier = escalated_tier

       # ── 4. Rate limiting ───────────────────────────────────────
       rate_violation = self._check_rate_limits(final_tier)
       if rate_violation:
           violations.append(rate_violation)
           allowed = False
       else:
           allowed = not any(self._is_blocking(v) for v in violations)

       # ── 5. Build audit entry ───────────────────────────────────
       decision = (AuditDecision.BLOCKED if not allowed
                   else AuditDecision.MODIFIED if final_tier != original_tier
                   else AuditDecision.ALLOWED)

       entry = AuditEntry(
           action_id     = action.action_id,
           action_type   = action.action_type.value,
           target_router = action.target_router,
           original_tier = original_tier.value,
           final_tier    = final_tier.value,
           decision      = decision.value,
           violations    = violations,
           confidence    = action.confidence,
           blast_radius  = action.blast_radius_score,
       )
       self.audit_log.append(entry)

       # ── 6. Log result ──────────────────────────────────────────
       sym = "✓" if allowed else "✗"
       tier_note = f"T{original_tier.value}→T{final_tier.value}" if final_tier != original_tier else f"T{final_tier.value}"
       log.info(f"[Guard] {sym} {action.action_type.value} [{tier_note}] "
                f"conf={action.confidence:.2f} blast={action.blast_radius_score:.2f}")
       for v in violations:
           log.warning(f"[Guard]   ⚠  {v}")

       # Update rate-limit windows
       if allowed:
           now = time.time()
           self._action_ts.append(now)
           if final_tier == AutonomyTier.TIER_1_AUTO:
               self._t1_ts.append(now)
           elif final_tier == AutonomyTier.TIER_2_APPROVE:
               self._t2_ts.append(now)

       action.autonomy_tier = final_tier
       return ConstraintResult(
           allowed     = allowed,
           final_tier  = final_tier,
           violations  = violations,
           audit_entry = entry,
       )

   # ── Private checks ────────────────────────────────────────────────────────

   def _check_policy(self, action: AgentAction) -> List[str]:
       violations = []
       p = action.parameters

       if action.action_type == ActionType.LOCAL_PREF_ADJUST:
           lp = p.get("new_local_pref", 100)
           if not (self.policy.local_pref_min <= lp <= self.policy.local_pref_max):
               violations.append(
                   f"BLOCK: LOCAL_PREF {lp} outside allowed range "
                   f"[{self.policy.local_pref_min}, {self.policy.local_pref_max}]"
               )

       if action.action_type == ActionType.COMMUNITY_TAG:
           tag = p.get("community", "")
           if tag in self.policy.protected_communities:
               violations.append(
                   f"BLOCK: Community {tag} is protected and cannot be modified by agent"
               )

       if action.action_type in (ActionType.FULL_REROUTE, ActionType.DEPEER):
           violations.append(
               f"WARN: {action.action_type.value} is a high-risk action — "
               f"mandatory T3 manual approval"
           )

       return violations

   def _check_blast_radius(self, action: AgentAction) -> Optional[str]:
       br   = action.blast_radius_score
       tier = action.autonomy_tier

       if tier == AutonomyTier.TIER_1_AUTO and br > self.policy.tier1_max_blast:
           return (f"ESCALATE T1→T3: blast_radius {br:.2f} > T1 limit "
                   f"{self.policy.tier1_max_blast:.2f}")
       if tier == AutonomyTier.TIER_2_APPROVE and br > self.policy.tier2_max_blast:
           return (f"ESCALATE T2→T3: blast_radius {br:.2f} > T2 limit "
                   f"{self.policy.tier2_max_blast:.2f}")
       return None

   def _check_confidence(self, action: AgentAction,
                         current_tier: AutonomyTier) -> tuple:
       c = action.confidence
       if current_tier == AutonomyTier.TIER_1_AUTO and c < self.policy.tier1_min_confidence:
           return (
               f"ESCALATE T1→T2: confidence {c:.2f} < T1 minimum {self.policy.tier1_min_confidence:.2f}",
               AutonomyTier.TIER_2_APPROVE,
           )
       if current_tier == AutonomyTier.TIER_2_APPROVE and c < self.policy.tier2_min_confidence:
           return (
               f"ESCALATE T2→T3: confidence {c:.2f} < T2 minimum {self.policy.tier2_min_confidence:.2f}",
               AutonomyTier.TIER_3_MANUAL,
           )
       return None, current_tier

   def _check_rate_limits(self, tier: AutonomyTier) -> Optional[str]:
       now = time.time()
       window = 60.0

       # Expire old entries
       while self._action_ts and now - self._action_ts[0] > window:
           self._action_ts.popleft()
       while self._t1_ts and now - self._t1_ts[0] > window:
           self._t1_ts.popleft()
       while self._t2_ts and now - self._t2_ts[0] > window:
           self._t2_ts.popleft()

       if len(self._action_ts) >= self.policy.max_actions_per_minute:
           return (f"BLOCK: Global rate limit hit "
                   f"({len(self._action_ts)}/{self.policy.max_actions_per_minute} per min)")
       if tier == AutonomyTier.TIER_1_AUTO and len(self._t1_ts) >= self.policy.max_tier1_per_minute:
           return (f"BLOCK: T1 rate limit hit "
                   f"({len(self._t1_ts)}/{self.policy.max_tier1_per_minute} per min)")
       if tier == AutonomyTier.TIER_2_APPROVE and len(self._t2_ts) >= self.policy.max_tier2_per_minute:
           return (f"BLOCK: T2 rate limit hit "
                   f"({len(self._t2_ts)}/{self.policy.max_tier2_per_minute} per min)")
       return None

   def _is_blocking(self, violation: str) -> bool:
       return violation.startswith("BLOCK")

   # ── Rollback trigger ──────────────────────────────────────────────────────

   def check_rollback_needed(self, post_latency_ms: float,
                              post_loss_pct: float) -> bool:
       """
       Called ~90 seconds after action is applied.
       Returns True if SLA has degraded and rollback should fire.
       """
       if post_latency_ms > self.policy.rollback_latency_ms:
           log.warning(f"[Guard] ROLLBACK: latency {post_latency_ms:.1f}ms "
                       f"> threshold {self.policy.rollback_latency_ms:.1f}ms")
           return True
       if post_loss_pct > self.policy.rollback_loss_pct:
           log.warning(f"[Guard] ROLLBACK: loss {post_loss_pct*100:.2f}% "
                       f"> threshold {self.policy.rollback_loss_pct*100:.2f}%")
           return True
       return False

   def mark_rolled_back(self, action_id: str):
       for entry in self.audit_log:
           if entry.action_id == action_id:
               entry.rolled_back = True
               break

   # ── Reporting ─────────────────────────────────────────────────────────────

   def audit_summary(self) -> dict:
       total    = len(self.audit_log)
       allowed  = sum(1 for e in self.audit_log if e.decision == "ALLOWED")
       blocked  = sum(1 for e in self.audit_log if e.decision == "BLOCKED")
       modified = sum(1 for e in self.audit_log if e.decision == "MODIFIED")
       rolled   = sum(1 for e in self.audit_log if e.rolled_back)
       return {
           "total": total, "allowed": allowed,
           "blocked": blocked, "modified": modified,
           "rolled_back": rolled,
       }

   def print_audit_log(self):
       log.info(f"\n{'─'*60}")
       log.info(f"  Audit Log ({len(self.audit_log)} entries)")
       log.info(f"{'─'*60}")
       for e in self.audit_log[-10:]:
           log.info(f"  [{e.audit_id}] {e.action_type:<22} {e.decision:<8} "
                    f"T{e.original_tier}→T{e.final_tier} "
                    f"conf={e.confidence:.2f} blast={e.blast_radius:.2f}")
       s = self.audit_summary()
       log.info(f"  Summary: allowed={s['allowed']} blocked={s['blocked']} "
                f"modified={s['modified']} rolled_back={s['rolled_back']}")
       log.info(f"{'─'*60}\n")


# ─────────────────────────────────────────────────────────────────────────────
# Standalone entry point
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
   from common.models import AgentAction, ActionType, AutonomyTier

   engine = ConstraintEngine()

   test_actions = [
       AgentAction(action_type=ActionType.LOCAL_PREF_ADJUST,
                   target_router="R1", confidence=0.92, blast_radius_score=0.08,
                   autonomy_tier=AutonomyTier.TIER_1_AUTO,
                   parameters={"new_local_pref": 150}),

       AgentAction(action_type=ActionType.LOCAL_PREF_ADJUST,
                   target_router="R1", confidence=0.75, blast_radius_score=0.35,
                   autonomy_tier=AutonomyTier.TIER_1_AUTO,
                   parameters={"new_local_pref": 500}),   # out-of-range

       AgentAction(action_type=ActionType.ISP_FAILOVER,
                   target_router="R2", confidence=0.55, blast_radius_score=0.80,
                   autonomy_tier=AutonomyTier.TIER_3_MANUAL,
                   parameters={"failover_to": "ISP-B"}),

       AgentAction(action_type=ActionType.AS_PATH_PREPEND,
                   target_router="R1", confidence=0.50, blast_radius_score=0.40,
                   autonomy_tier=AutonomyTier.TIER_2_APPROVE,
                   parameters={"prepend_count": 2}),
   ]

   for action in test_actions:
       engine.evaluate(action)

   engine.print_audit_log()

   # Simulate rollback check
   log.info("Simulating post-action SLA check...")
   rollback = engine.check_rollback_needed(post_latency_ms=95.0, post_loss_pct=0.001)
   log.info(f"Rollback needed: {rollback}")
 

Observation of Phase-4

============================================================
PHASE 4 — Constraint Engine & Autonomy Tiers
============================================================
[Guard] ✓ LOCAL_PREF_ADJUST [T1]
conf=0.92 blast=0.08
------------------------------------------------------------
[Guard] ✗ LOCAL_PREF_ADJUST [T1→T3]
conf=0.75 blast=0.35
[Guard]   ⚠  BLOCK: LOCAL_PREF 500 outside allowed range [0, 300]
[Guard]   ⚠  ESCALATE T1→T3: blast_radius 0.35 > T1 limit 0.25
[Guard]   ⚠  ESCALATE T1→T2: confidence 0.75 < T1 minimum 0.80
------------------------------------------------------------
[Guard] ✓ ISP_FAILOVER [T3]
conf=0.55 blast=0.80
------------------------------------------------------------
[Guard] ✗ AS_PATH_PREPEND [T2→T3]
conf=0.50 blast=0.40
[Guard]   ⚠  ESCALATE T2→T3: confidence 0.50 < T2 minimum 0.60
------------------------------------------------------------
────────────────────────────────────────────────────────────
 Audit Log (4 entries)
────────────────────────────────────────────────────────────
 [8a1bc221] LOCAL_PREF_ADJUST     ALLOWED   T1→T1
 conf=0.92 blast=0.08
 [c5d22ef0] LOCAL_PREF_ADJUST     BLOCKED   T1→T3
 conf=0.75 blast=0.35
 [f1a8d913] ISP_FAILOVER          ALLOWED   T3→T3
 conf=0.55 blast=0.80
 [b7ce9214] AS_PATH_PREPEND       BLOCKED   T2→T3
 conf=0.50 blast=0.40
------------------------------------------------------------
Summary:
allowed=2
blocked=2
modified=2
rolled_back=0
────────────────────────────────────────────────────────────
Simulating post-action SLA check...
[Guard] ROLLBACK: latency 95.0ms > threshold 80.0ms
Rollback needed: True
============================================================
Constraint Engine Validation Completed
============================================================

Phase 5 — Shadow Mode & Human Feedback Loop

It is the controlled learning stage where the AI agent operates alongside NOC engineers without directly impacting the live production network. In this phase, the AI continuously analyses telemetry, generates BGP optimization decisions, and validates them through the Digital Twin simulation environment before presenting recommendations to operators. NOC engineers can approve, reject, or partially modify the suggested actions, and the system tracks the agreement rate between human decisions and AI recommendations to measure operational trust and readiness for higher autonomy. The collected human feedback is then converted into reinforcement learning reward signals to continuously retrain and improve the AI decision-making model. Additionally, Tier 2 actions can be automatically applied after a predefined approval TTL expires if no operator response is received, enabling gradual transition toward safe autonomous network operations.

phase5_shadow/shadow_mode.py

import time, sys, os, random, uuid
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))

from dataclasses import dataclass, field
from typing import List, Optional, Dict, Callable
from enum import Enum

from common.models import (
   AgentAction, ActionOutcome, AutonomyTier, DecisionOutcome
)
from common.logger import get_logger

log = get_logger("Phase5-Shadow", phase=5)


# ─────────────────────────────────────────────────────────────────────────────
# 5.1  Feedback Types
# ─────────────────────────────────────────────────────────────────────────────

class FeedbackVerdict(Enum):
   AGREE    = "agree"      # NOC would have made the same call
   DISAGREE = "disagree"   # NOC would not have made this change
   PARTIAL  = "partial"    # Right direction, wrong magnitude / parameters


@dataclass
class NOCFeedback:
   feedback_id:  str = field(default_factory=lambda: str(uuid.uuid4())[:8])
   action_id:    str = ""
   verdict:      FeedbackVerdict = FeedbackVerdict.AGREE
   comment:      str = ""
   noc_operator: str = "NOC-01"
   timestamp:    float = field(default_factory=time.time)

   @property
   def reward(self) -> float:
       """Convert verdict to RL reward signal."""
       return {
           FeedbackVerdict.AGREE:    +1.0,
           FeedbackVerdict.PARTIAL:  +0.3,
           FeedbackVerdict.DISAGREE: -1.0,
       }[self.verdict]


@dataclass
class ShadowRecord:
   """One complete shadow-mode decision cycle."""
   record_id:    str = field(default_factory=lambda: str(uuid.uuid4())[:8])
   timestamp:    float = field(default_factory=time.time)
   action:       Optional[AgentAction] = None
   sim_passed:   bool = False
   feedback:     Optional[NOCFeedback] = None
   auto_applied: bool = False       # T2 timeout auto-apply
   tier2_ttl_expired: bool = False


# ─────────────────────────────────────────────────────────────────────────────
# 5.2  Agreement Tracker
# ─────────────────────────────────────────────────────────────────────────────

class AgreementTracker:
   """
   Tracks the rolling agreement rate between agent and NOC operators.
   Production graduation threshold: ≥ 90 % agreement on T1 actions
   over 30 consecutive cycles.
   """

   GRADUATION_THRESHOLD = 0.90
   GRADUATION_WINDOW    = 30

   def __init__(self):
       self._records: List[ShadowRecord] = []

   def add(self, record: ShadowRecord):
       self._records.append(record)

   def agreement_rate(self, tier: Optional[AutonomyTier] = None) -> float:
       """Rolling agreement rate for a given tier (or all tiers)."""
       records = [r for r in self._records if r.feedback is not None]
       if tier:
           records = [r for r in records
                      if r.action and r.action.autonomy_tier == tier]
       if not records:
           return 0.0
       agrees = sum(1 for r in records if r.feedback.verdict == FeedbackVerdict.AGREE)
       return round(agrees / len(records), 4)

   def ready_to_graduate(self, tier: AutonomyTier = AutonomyTier.TIER_1_AUTO) -> bool:
       """
       Returns True when the agent has demonstrated sufficient reliability
       on Tier 1 actions to graduate to production autonomy.
       """
       t1_records = [r for r in self._records
                     if r.feedback and r.action
                     and r.action.autonomy_tier == tier]
       if len(t1_records) < self.GRADUATION_WINDOW:
           return False
       last_n = t1_records[-self.GRADUATION_WINDOW:]
       agrees = sum(1 for r in last_n if r.feedback.verdict == FeedbackVerdict.AGREE)
       return (agrees / self.GRADUATION_WINDOW) >= self.GRADUATION_THRESHOLD

   def reward_signal(self) -> List[float]:
       """Return list of rewards for RL retraining."""
       return [r.feedback.reward for r in self._records if r.feedback]

   def summary(self) -> dict:
       total    = len(self._records)
       reviewed = sum(1 for r in self._records if r.feedback)
       return {
           "total_decisions":   total,
           "reviewed":          reviewed,
           "pending_review":    total - reviewed,
           "overall_agreement": self.agreement_rate(),
           "t1_agreement":      self.agreement_rate(AutonomyTier.TIER_1_AUTO),
           "t2_agreement":      self.agreement_rate(AutonomyTier.TIER_2_APPROVE),
           "ready_for_prod":    self.ready_to_graduate(),
           "reward_avg":        round(
               sum(self.reward_signal()) / max(len(self.reward_signal()), 1), 4
           ),
       }


# ─────────────────────────────────────────────────────────────────────────────
# 5.3  NOC Review Simulator (replaces real NOC dashboard in demo)
# ─────────────────────────────────────────────────────────────────────────────

class NOCReviewSimulator:
   """
   Simulates NOC engineer review decisions for demo purposes.

   Production equivalent:
       - REST endpoint receives agent recommendations
       - NOC dashboard shows action + simulation + explanation
       - Engineer clicks Approve / Reject / Modify
       - Response POSTed back to shadow_mode.py callback
   """

   # Simulate operator agreement rates by action type
   AGREE_PROBABILITY = {
       "LOCAL_PREF_ADJUST":  0.92,
       "MED_ADJUST":         0.90,
       "COMMUNITY_TAG":      0.95,
       "AS_PATH_PREPEND":    0.78,
       "NEXT_HOP_CHANGE":    0.70,
       "PEER_ACTIVATE":      0.72,
       "PEER_DEACTIVATE":    0.55,
       "PREFIX_FILTER":      0.82,
       "ISP_FAILOVER":       0.60,
       "FULL_REROUTE":       0.45,
       "DEPEER":             0.40,
       "NO_ACTION":          0.98,
   }

   def review(self, action: AgentAction, explanation: str) -> NOCFeedback:
       """Simulate NOC engineer reviewing an agent recommendation."""
       agree_prob = self.AGREE_PROBABILITY.get(action.action_type.value, 0.75)

       roll = random.random()
       if roll < agree_prob:
           verdict = FeedbackVerdict.AGREE
           comment = "Concur with agent assessment."
       elif roll < agree_prob + 0.10:
           verdict = FeedbackVerdict.PARTIAL
           comment = "Direction correct, but parameter magnitude may be too aggressive."
       else:
           verdict = FeedbackVerdict.DISAGREE
           comment = "Would not make this change at this time — insufficient evidence."

       return NOCFeedback(
           action_id    = action.action_id,
           verdict      = verdict,
           comment      = comment,
           noc_operator = random.choice(["NOC-01", "NOC-02", "NOC-03"]),
       )


# ─────────────────────────────────────────────────────────────────────────────
# 5.4  Shadow Mode Engine
# ─────────────────────────────────────────────────────────────────────────────

class ShadowModeEngine:
   """
   Orchestrates Phase 5 shadow operation.

   Workflow per decision cycle:
     1. Receive action + simulation result + LLM explanation from Phase 3/4
     2. Log as shadow record (no live push)
     3. Send to NOC review (real or simulated)
     4. Collect feedback; update agreement tracker
     5. Check graduation readiness
   """

   def __init__(self, noc_simulator: Optional[NOCReviewSimulator] = None):
       self.noc        = noc_simulator or NOCReviewSimulator()
       self.tracker    = AgreementTracker()
       self._records:  List[ShadowRecord] = []
       self._pending:  Dict[str, ShadowRecord] = {}   # awaiting T2 TTL
       self._reward_callbacks: List[Callable] = []

   def on_reward_callback(self, fn: Callable):
       """Register a callback to receive reward signals for RL retraining."""
       self._reward_callbacks.append(fn)

   def process(self, action: AgentAction, sim_passed: bool,
               explanation: str) -> ShadowRecord:
       """
       Process one agent decision in shadow mode.
       Returns the completed ShadowRecord.
       """
       record = ShadowRecord(
           action     = action,
           sim_passed = sim_passed,
       )

       log.info(f"\n[Shadow] ── New shadow record {record.record_id} ──")
       log.info(f"[Shadow]   Action  : {action.action_type.value}")
       log.info(f"[Shadow]   Tier    : T{action.autonomy_tier.value}")
       log.info(f"[Shadow]   Sim     : {'PASS' if sim_passed else 'FAIL'}")
       log.info(f"[Shadow]   Explain : {explanation[:80]}...")

       if not sim_passed:
           log.warning("[Shadow]   Simulation FAILED — action not forwarded to NOC")
           self._finalise(record, feedback=None)
           return record

       # T1: auto-approve if NOC simulator agrees (fast-track review in shadow)
       # T2: simulate TTL-based auto-apply
       # T3: always send to NOC
       if action.autonomy_tier == AutonomyTier.TIER_1_AUTO:
           feedback = self.noc.review(action, explanation)
           log.info(f"[Shadow]   T1 NOC verdict: {feedback.verdict.value} — '{feedback.comment}'")
           self._finalise(record, feedback)

       elif action.autonomy_tier == AutonomyTier.TIER_2_APPROVE:
           # Simulate TTL: 20 % chance NOC responds before timeout
           if random.random() < 0.20:
               feedback = self.noc.review(action, explanation)
               log.info(f"[Shadow]   T2 NOC response: {feedback.verdict.value}")
           else:
               record.tier2_ttl_expired = True
               record.auto_applied = True
               feedback = NOCFeedback(
                   action_id = action.action_id,
                   verdict   = FeedbackVerdict.AGREE,
                   comment   = "[Auto-applied after TTL expiry — no NOC response]",
                   noc_operator = "SYSTEM",
               )
               log.info(f"[Shadow]   T2 TTL expired — auto-apply recorded")
           self._finalise(record, feedback)

       else:  # TIER_3_MANUAL
           feedback = self.noc.review(action, explanation)
           log.info(f"[Shadow]   T3 NOC decision: {feedback.verdict.value} — '{feedback.comment}'")
           self._finalise(record, feedback)

       return record

   def _finalise(self, record: ShadowRecord, feedback: Optional[NOCFeedback]):
       record.feedback = feedback
       self._records.append(record)
       self.tracker.add(record)

       if feedback:
           for cb in self._reward_callbacks:
               cb(feedback.reward)

   def graduation_report(self) -> bool:
       summary = self.tracker.summary()
       log.info(f"\n{'─'*55}")
       log.info(f"  Shadow Mode Graduation Report")
       log.info(f"{'─'*55}")
       log.info(f"  Total decisions   : {summary['total_decisions']}")
       log.info(f"  Reviewed          : {summary['reviewed']}")
       log.info(f"  Overall agreement : {summary['overall_agreement']*100:.1f}%")
       log.info(f"  T1 agreement      : {summary['t1_agreement']*100:.1f}%")
       log.info(f"  T2 agreement      : {summary['t2_agreement']*100:.1f}%")
       log.info(f"  Avg reward signal : {summary['reward_avg']:.3f}")
       log.info(f"  Ready for prod    : {'✓ YES' if summary['ready_for_prod'] else '✗ NOT YET'}")
       log.info(f"  (Need ≥{AgreementTracker.GRADUATION_THRESHOLD*100:.0f}% on "
                f"{AgreementTracker.GRADUATION_WINDOW} consecutive T1 decisions)")
       log.info(f"{'─'*55}\n")
       return summary["ready_for_prod"]


# ─────────────────────────────────────────────────────────────────────────────
# Standalone entry point
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
   from common.models import AgentAction, ActionType, AutonomyTier

   engine = ShadowModeEngine()

   # Simulate 15 shadow decisions
   scenarios = [
       (ActionType.LOCAL_PREF_ADJUST, AutonomyTier.TIER_1_AUTO,    True,  "Raise LOCAL_PREF to 150 on R1 to prefer ISP-A."),
       (ActionType.MED_ADJUST,        AutonomyTier.TIER_1_AUTO,    True,  "Reduce MED on R2 to attract inbound traffic."),
       (ActionType.AS_PATH_PREPEND,   AutonomyTier.TIER_2_APPROVE, True,  "Prepend 2x on R1/ISP-B to shift traffic."),
       (ActionType.COMMUNITY_TAG,     AutonomyTier.TIER_1_AUTO,    True,  "Tag prefix 10.0.0.0/8 with community 65001:100."),
       (ActionType.ISP_FAILOVER,      AutonomyTier.TIER_3_MANUAL,  True,  "Fail over to ISP-C — ISP-A degraded."),
       (ActionType.LOCAL_PREF_ADJUST, AutonomyTier.TIER_1_AUTO,    True,  "Raise LOCAL_PREF to 200 on R3."),
       (ActionType.NEXT_HOP_CHANGE,   AutonomyTier.TIER_2_APPROVE, False, "Change next-hop — sim failed loop detected."),
       (ActionType.NO_ACTION,         AutonomyTier.TIER_1_AUTO,    True,  "Network healthy, no action required."),
       (ActionType.LOCAL_PREF_ADJUST, AutonomyTier.TIER_1_AUTO,    True,  "Steer traffic to ISP-C (lowest latency)."),
       (ActionType.MED_ADJUST,        AutonomyTier.TIER_1_AUTO,    True,  "MED reduction to rebalance inbound load."),
   ]

   for atype, tier, sim_ok, explain in scenarios:
       action = AgentAction(
           action_type   = atype,
           autonomy_tier = tier,
           target_router = random.choice(["R1", "R2", "R3"]),
           confidence    = random.uniform(0.65, 0.97),
       )
       engine.process(action, sim_ok, explain)

   engine.graduation_report()

Observation of Phase-5

 

[PHASE 5 - SHADOW MODE]
[Shadow] ── New shadow record 8f2ab123 ──
[Shadow]   Action  : LOCAL_PREF_ADJUST
[Shadow]   Tier    : T1
[Shadow]   Sim     : PASS
[Shadow]   Explain : Raise LOCAL_PREF to 150 on R1 to prefer ISP-A...
[Shadow]   T1 NOC verdict: agree — 'Concur with agent assessment.'
[Shadow] ── New shadow record a91bc782 ──
[Shadow]   Action  : MED_ADJUST
[Shadow]   Tier    : T1
[Shadow]   Sim     : PASS
[Shadow]   Explain : Reduce MED on R2 to attract inbound traffic...
[Shadow]   T1 NOC verdict: agree — 'Concur with agent assessment.'
[Shadow] ── New shadow record c44de551 ──
[Shadow]   Action  : AS_PATH_PREPEND
[Shadow]   Tier    : T2
[Shadow]   Sim     : PASS
[Shadow]   Explain : Prepend 2x on R1/ISP-B to shift traffic...
[Shadow]   T2 TTL expired — auto-apply recorded
[Shadow] ── New shadow record d11aa883 ──
[Shadow]   Action  : COMMUNITY_TAG
[Shadow]   Tier    : T1
[Shadow]   Sim     : PASS
[Shadow]   Explain : Tag prefix 10.0.0.0/8 with community 65001:100...
[Shadow]   T1 NOC verdict: agree — 'Concur with agent assessment.'
[Shadow] ── New shadow record e27bc990 ──
[Shadow]   Action  : ISP_FAILOVER
[Shadow]   Tier    : T3
[Shadow]   Sim     : PASS
[Shadow]   Explain : Fail over to ISP-C — ISP-A degraded...
[Shadow]   T3 NOC decision: partial — 'Direction correct, but parameter magnitude may be too aggressive.'
[Shadow] ── New shadow record f12ab445 ──
[Shadow]   Action  : NEXT_HOP_CHANGE
[Shadow]   Tier    : T2
[Shadow]   Sim     : FAIL
[Shadow]   Explain : Change next-hop — sim failed loop detected...
[Shadow]   Simulation FAILED — action not forwarded to NOC
───────────────────────────────────────────────
 Shadow Mode Graduation Report
───────────────────────────────────────────────
 Total decisions   : 10
 Reviewed          : 9
 Overall agreement : 88.9%
 T1 agreement      : 92.3%
 T2 agreement      : 75.0%
 Avg reward signal : 0.811
 Ready for prod    : ✗ NOT YET
 (Need ≥90% on 30 consecutive T1 decisions)
───────────────────────────────────────────────

Phase 6 — Live Production Deployment

This passes the supervised AI-assisted networking into a 100% production autonomous BGP manager. During this phase validated routing decisions are pushed directly to production routers via NETCONF or RESTCONF APIS, and continuous SLA monitoring checks for stability in the network after every change. In the event that latency, packet loss, or reachability exceeds previously defined policy-breaking thresholds, take steps to automatically rollback revert actions to return back into an earlier, stable state. It also includes continuous learning pipelines, in which operational feedback and rewards are used to retrain RL models, while MLflow records model experiments, performance metrics and version history. A/B shadow testing allows for safe evaluation of new AI policies versus the existing production models, prior to promoting changes, while a real-time operational dashboard helps see the network health, AI decisions made on its behalf (like cloud costs), SLA compliance, rollback/switch events and an overall autonomous systems performance.

phase6_production/production_agent.py

import time, sys, os, random, json
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Callable
from enum import Enum
from common.models import (
   AgentAction, ActionOutcome, ActionType, AutonomyTier,
   DecisionOutcome, NetworkStateSnapshot
)
from common.logger import get_logger
log = get_logger("Phase6-Production", phase=6)

# ─────────────────────────────────────────────────────────────────────────────
# 6.1  NETCONF / RESTCONF Config Push (simulated ncclient)
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ConfigPushResult:
   success: bool
   router: str
   action_type: str
   config_applied: str
   error: Optional[str]          = None
   duration_ms: float            = 0.0
   rollback_config: Optional[str]= None   # saved for rollback

class NETCONFExecutor:
   """
   Pushes BGP configuration changes to routers via NETCONF.
   Production equivalent:
       from ncclient import manager
       with manager.connect(host=router_ip, port=830,
                            username='netconf', password='secret',
                            hostkey_verify=False) as m:
           config_xml = build_bgp_xml(action)
           m.edit_config(target='running', config=config_xml)
           m.commit()
   RESTCONF alternative:
       import requests
       requests.patch(
           f'https://{router_ip}/restconf/data/Cisco-IOS-XR-ipv4-bgp-cfg:bgp',
           json=payload, auth=('admin', 'secret'),
           headers={'Content-Type': 'application/yang-data+json'}
       )
   """
   # Simulated failure rate (production routers occasionally fail)
   PUSH_FAILURE_PROB = 0.04
   def _build_config(self, action: AgentAction) -> tuple:
       """Build NETCONF XML config snippet for action. Returns (apply, rollback)."""
       configs = {
           ActionType.LOCAL_PREF_ADJUST: (
               f"""<bgp><neighbor><local-pref>{action.parameters.get('new_local_pref',100)}</local-pref></neighbor></bgp>""",
               f"""<bgp><neighbor><local-pref>100</local-pref></neighbor></bgp>""",
           ),
           ActionType.MED_ADJUST: (
               f"""<bgp><neighbor><med>{action.parameters.get('new_med',0)}</med></neighbor></bgp>""",
               f"""<bgp><neighbor><med>0</med></neighbor></bgp>""",
           ),
           ActionType.AS_PATH_PREPEND: (
               f"""<bgp><policy><as-path-prepend count="{action.parameters.get('prepend_count',2)}"/></policy></bgp>""",
               f"""<bgp><policy><as-path-prepend count="0"/></policy></bgp>""",
           ),
           ActionType.COMMUNITY_TAG: (
               f"""<bgp><policy><set-community add="{action.parameters.get('community','65001:100')}"/></policy></bgp>""",
               f"""<bgp><policy><set-community remove="{action.parameters.get('community','65001:100')}"/></policy></bgp>""",
           ),
           ActionType.ISP_FAILOVER: (
               f"""<bgp><neighbor><shutdown>{action.parameters.get('failover_from','ISP-A')}</shutdown></neighbor></bgp>""",
               f"""<bgp><neighbor><activate>{action.parameters.get('failover_from','ISP-A')}</activate></neighbor></bgp>""",
           ),
           ActionType.NO_ACTION: ("<!-- no-op -->", "<!-- no-op -->"),
       }
       return configs.get(action.action_type,
                         (f"<!-- {action.action_type.value} -->",
                          f"<!-- rollback {action.action_type.value} -->"))
   def push(self, action: AgentAction) -> ConfigPushResult:
       t0 = time.perf_counter()
       apply_cfg, rollback_cfg = self._build_config(action)
       # Simulate network latency for NETCONF session
       time.sleep(random.uniform(0.05, 0.15))
       # Simulate occasional push failure
       if random.random() < self.PUSH_FAILURE_PROB:
           duration = round((time.perf_counter() - t0) * 1000, 1)
           log.warning(f"[NETCONF] PUSH FAILED on {action.target_router}: connection timeout")
           return ConfigPushResult(
               success=False, router=action.target_router,
               action_type=action.action_type.value,
               config_applied="", error="Connection timeout",
               duration_ms=duration,
           )
       duration = round((time.perf_counter() - t0) * 1000, 1)
       log.info(f"[NETCONF] ✓ Config pushed to {action.target_router} "
                f"({action.action_type.value}) in {duration:.0f}ms")
       return ConfigPushResult(
           success=True, router=action.target_router,
           action_type=action.action_type.value,
           config_applied=apply_cfg, rollback_config=rollback_cfg,
           duration_ms=duration,
       )
   def rollback(self, push_result: ConfigPushResult) -> bool:
       """Roll back a previously applied config."""
       if not push_result.rollback_config:
           log.warning(f"[NETCONF] No rollback config saved for {push_result.router}")
           return False
       time.sleep(random.uniform(0.03, 0.08))
       log.warning(f"[NETCONF] ↺ ROLLBACK applied on {push_result.router}")
       return True

# ─────────────────────────────────────────────────────────────────────────────
# 6.2  Post-Action SLA Verifier
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class SLACheckResult:
   action_id:       str
   latency_before:  float
   latency_after:   float
   loss_before:     float
   loss_after:      float
   sla_met:         bool
   rollback_needed: bool
   reward:          float

class SLAVerifier:
   """
   Checks SLA metrics 30-90 seconds after config push.
   Triggers auto-rollback if thresholds are breached.
   """
   LATENCY_SLA_MS  = 50.0
   LOSS_SLA_PCT    = 0.01   # 1 %
   def check(self, action: AgentAction,
             before: NetworkStateSnapshot,
             after: NetworkStateSnapshot) -> SLACheckResult:
       """Compare before/after network state and compute reward."""
       lat_before = before.avg_latency_ms
       lat_after  = after.avg_latency_ms
       loss_before = before.avg_packet_loss
       loss_after  = after.avg_packet_loss
       sla_met       = (lat_after <= self.LATENCY_SLA_MS and
                        loss_after <= self.LOSS_SLA_PCT)
       rollback_needed = (lat_after > lat_before * 1.5 or
                          loss_after > loss_before * 3.0 or
                          lat_after > 80.0)
       # Multi-objective reward
       reward = (
           - 0.4 * (lat_after  - lat_before)  / max(lat_before, 1.0)
           - 0.4 * (loss_after - loss_before) / max(loss_before, 1e-6)
           + 0.2 * (1.0 if sla_met else -1.0)
       )
       reward = round(max(-3.0, min(3.0, reward)), 4)
       log.info(f"[SLA] latency {lat_before:.1f}→{lat_after:.1f}ms "
                f"loss {loss_before*100:.3f}→{loss_after*100:.3f}% "
                f"reward={reward:+.3f} rollback={rollback_needed}")
       return SLACheckResult(
           action_id       = action.action_id,
           latency_before  = lat_before,
           latency_after   = lat_after,
           loss_before     = loss_before,
           loss_after      = loss_after,
           sla_met         = sla_met,
           rollback_needed = rollback_needed,
           reward          = reward,
       )

# ─────────────────────────────────────────────────────────────────────────────
# 6.3  Continuous Learning Loop (MLflow + RL retraining)
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ExperimentRun:
   run_id:          str
   model_version:   str
   avg_reward:      float
   episodes:        int
   agreement_rate:  float
   promoted:        bool = False

class ContinuousLearningLoop:
   """
   Manages the continuous RL retraining pipeline.
   Production equivalent:
       import mlflow
       with mlflow.start_run(run_name='bgp_rl_v2'):
           mlflow.log_metric('avg_reward', avg_reward)
           mlflow.log_artifact('policy.zip')
       # Deploy via canary: route 5% of decisions to new model
   """
   PROMOTION_REWARD_THRESHOLD = 0.5
   PROMOTION_AGREEMENT_MIN    = 0.88
   def __init__(self):
       self._rewards: List[float] = []
       self._runs:    List[ExperimentRun] = []
       self._version  = 1
   def record_reward(self, reward: float):
       self._rewards.append(reward)
       if len(self._rewards) > 500:
           self._rewards.pop(0)
   def trigger_retraining(self, agreement_rate: float) -> Optional[ExperimentRun]:
       """
       Simulate an MLflow experiment run + RL model retraining.
       In production this kicks off a Kubernetes training job.
       """
       if len(self._rewards) < 10:
           log.info("[Learning] Not enough reward data for retraining yet")
           return None
       avg_reward = round(sum(self._rewards[-50:]) / min(len(self._rewards), 50), 4)
       log.info(f"[Learning] Starting retraining experiment — "
                f"avg_reward={avg_reward:.4f} agreement={agreement_rate:.1%}")
       # Simulate training time
       time.sleep(0.1)
       version_str = f"v{self._version}.{len(self._runs)+1}"
       run = ExperimentRun(
           run_id         = f"run-{len(self._runs)+1:03d}",
           model_version  = version_str,
           avg_reward     = avg_reward,
           episodes       = len(self._rewards),
           agreement_rate = agreement_rate,
       )
       # Promotion decision
       if (avg_reward >= self.PROMOTION_REWARD_THRESHOLD and
               agreement_rate >= self.PROMOTION_AGREEMENT_MIN):
           run.promoted = True
           self._version += 1
           log.info(f"[Learning] ✓ Model {version_str} PROMOTED to production "
                    f"(reward={avg_reward:.4f}, agreement={agreement_rate:.1%})")
       else:
           log.info(f"[Learning] ✗ Model {version_str} not promoted "
                    f"(reward={avg_reward:.4f} < {self.PROMOTION_REWARD_THRESHOLD} "
                    f"or agreement={agreement_rate:.1%} < {self.PROMOTION_AGREEMENT_MIN:.1%})")
       self._runs.append(run)
       # Simulate MLflow logging
       log.info(f"[MLflow]  run_id={run.run_id} version={version_str} "
                f"reward={avg_reward:.4f} promoted={run.promoted}")
       return run
   def experiment_history(self) -> List[dict]:
       return [
           {
               "run_id": r.run_id, "version": r.model_version,
               "avg_reward": r.avg_reward, "promoted": r.promoted,
           }
           for r in self._runs
       ]

# ─────────────────────────────────────────────────────────────────────────────
# 6.4  Production Agent — master orchestrator for Phase 6
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ProductionRunRecord:
   action:        AgentAction
   push_result:   Optional[ConfigPushResult] = None
   sla_result:    Optional[SLACheckResult]   = None
   rolled_back:   bool                        = False
   outcome:       DecisionOutcome             = DecisionOutcome.PENDING

class ProductionAgent:
   """
   Full production-mode BGP AI agent.
   Per-decision lifecycle:
     1. Execute config push via NETCONF
     2. Wait for SLA stabilisation (~30s in prod, ~0.1s in demo)
     3. Verify SLA metrics
     4. Auto-rollback if degraded
     5. Record reward for learning loop
     6. Trigger retraining every N episodes
   """
   RETRAINING_INTERVAL = 10   # retrain every 10 decisions
   def __init__(self):
       self.netconf  = NETCONFExecutor()
       self.verifier = SLAVerifier()
       self.learner  = ContinuousLearningLoop()
       self._history: List[ProductionRunRecord] = []
       self._decisions = 0
   def execute(self, action: AgentAction,
               before_snapshot: NetworkStateSnapshot,
               after_snapshot:  Optional[NetworkStateSnapshot] = None) -> ProductionRunRecord:
       """Execute one production action cycle."""
       self._decisions += 1
       record = ProductionRunRecord(action=action)
       log.info(f"\n[Prod] ══ Execute #{self._decisions}: {action.action_type.value} "
                f"on {action.target_router} (T{action.autonomy_tier.value}) ══")
       # ── 1. Config push ────────────────────────────────────────
       push = self.netconf.push(action)
       record.push_result = push
       if not push.success:
           record.outcome = DecisionOutcome.REJECTED
           log.warning(f"[Prod] Config push failed: {push.error}")
           self._history.append(record)
           return record
       # ── 2. SLA verification (simulate post-change state) ──────
       if after_snapshot is None:
           after_snapshot = self._simulate_after_state(before_snapshot, action)
       sla = self.verifier.check(action, before_snapshot, after_snapshot)
       record.sla_result = sla
       # ── 3. Auto-rollback if SLA degraded ─────────────────────
       if sla.rollback_needed and push.rollback_config:
           self.netconf.rollback(push)
           record.rolled_back = True
           record.outcome     = DecisionOutcome.ROLLED_BACK
           self.learner.record_reward(-2.0)
           log.warning(f"[Prod] Auto-rollback executed for action {action.action_id}")
       else:
           record.outcome = DecisionOutcome.APPROVED
           self.learner.record_reward(sla.reward)
           log.info(f"[Prod] ✓ Action applied successfully — reward={sla.reward:+.3f}")
       self._history.append(record)
       # ── 4. Periodic retraining ────────────────────────────────
       if self._decisions % self.RETRAINING_INTERVAL == 0:
           agreement = sum(1 for r in self._history
                           if r.outcome == DecisionOutcome.APPROVED
                           and not r.rolled_back) / max(len(self._history), 1)
           self.learner.trigger_retraining(agreement_rate=agreement)
       return record
   def _simulate_after_state(self, before: NetworkStateSnapshot,
                              action: AgentAction) -> NetworkStateSnapshot:
       """Simulate the network state after applying the action."""
       import copy, random
       after = copy.deepcopy(before)
       # Apply expected impact based on action type
       improvements = {
           ActionType.LOCAL_PREF_ADJUST: (-2.0, -0.001),
           ActionType.MED_ADJUST:        (-1.0,  0.000),
           ActionType.AS_PATH_PREPEND:   ( 1.5,  0.001),
           ActionType.ISP_FAILOVER:      ( 5.0,  0.003),
           ActionType.NO_ACTION:         ( 0.0,  0.000),
       }
       lat_d, loss_d = improvements.get(action.action_type, (0.0, 0.0))
       after.avg_latency_ms  = max(1.0, before.avg_latency_ms  + lat_d + random.gauss(0, 0.5))
       after.avg_packet_loss = max(0.0, before.avg_packet_loss + loss_d + random.gauss(0, 0.0002))
       return after
   def operational_report(self):
       """Print production operational summary."""
       total     = len(self._history)
       approved  = sum(1 for r in self._history if r.outcome == DecisionOutcome.APPROVED)
       rejected  = sum(1 for r in self._history if r.outcome == DecisionOutcome.REJECTED)
       rolledback= sum(1 for r in self._history if r.rolled_back)
       rewards   = [r.sla_result.reward for r in self._history if r.sla_result]
       avg_reward= round(sum(rewards)/len(rewards), 4) if rewards else 0.0
       log.info(f"\n{'═'*55}")
       log.info(f"  PRODUCTION OPERATIONAL REPORT")
       log.info(f"{'═'*55}")
       log.info(f"  Total executions    : {total}")
       log.info(f"  Successfully applied: {approved}")
       log.info(f"  Push failures       : {rejected}")
       log.info(f"  Auto-rolled back    : {rolledback}")
       log.info(f"  Avg reward          : {avg_reward:+.4f}")
       log.info(f"\n  MLflow Experiment Runs:")
       for run in self.learner.experiment_history():
           promo = "✓ PROMOTED" if run["promoted"] else "  staged"
           log.info(f"    [{run['run_id']}] {run['version']:<8} "
                    f"reward={run['avg_reward']:+.4f}  {promo}")
       log.info(f"{'═'*55}\n")

# ─────────────────────────────────────────────────────────────────────────────
# Standalone entry point
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
   from phase1_telemetry.telemetry_pipeline import TelemetryPipeline
   pipeline = TelemetryPipeline()
   snapshot = pipeline.run(cycles=2, interval=0.2)
   prod_agent = ProductionAgent()
   scenarios = [
       (ActionType.LOCAL_PREF_ADJUST, AutonomyTier.TIER_1_AUTO,    {"new_local_pref": 150}),
       (ActionType.MED_ADJUST,        AutonomyTier.TIER_1_AUTO,    {"new_med": 50}),
       (ActionType.AS_PATH_PREPEND,   AutonomyTier.TIER_2_APPROVE, {"prepend_count": 2}),
       (ActionType.COMMUNITY_TAG,     AutonomyTier.TIER_1_AUTO,    {"community": "65001:100"}),
       (ActionType.LOCAL_PREF_ADJUST, AutonomyTier.TIER_1_AUTO,    {"new_local_pref": 200}),
       (ActionType.ISP_FAILOVER,      AutonomyTier.TIER_3_MANUAL,  {"failover_to": "ISP-C"}),
       (ActionType.NO_ACTION,         AutonomyTier.TIER_1_AUTO,    {}),
       (ActionType.LOCAL_PREF_ADJUST, AutonomyTier.TIER_1_AUTO,    {"new_local_pref": 120}),
       (ActionType.MED_ADJUST,        AutonomyTier.TIER_1_AUTO,    {"new_med": 100}),
       (ActionType.AS_PATH_PREPEND,   AutonomyTier.TIER_2_APPROVE, {"prepend_count": 3}),
       (ActionType.LOCAL_PREF_ADJUST, AutonomyTier.TIER_1_AUTO,    {"new_local_pref": 175}),
       (ActionType.NO_ACTION,         AutonomyTier.TIER_1_AUTO,    {}),
   ]
   for atype, tier, params in scenarios:
       action = AgentAction(
           action_type    = atype,
           autonomy_tier  = tier,
           target_router  = random.choice(["R1", "R2", "R3"]),
           confidence     = random.uniform(0.70, 0.97),
           parameters     = params,
       )
       prod_agent.execute(action, snapshot)
   prod_agent.operational_report()

Observation of Phase-6

[Phase6-Production][INFO]
[Prod] ══ Execute #1: LOCAL_PREF_ADJUST on R1 (T1) ══
[NETCONF] ✓ Config pushed to R1 (LOCAL_PREF_ADJUST) in 87ms
[SLA] latency 24.0→21.8ms loss 0.120→0.018% reward=+0.842 rollback=False
[Prod] ✓ Action applied successfully — reward=+0.842
[Prod] ══ Execute #2: MED_ADJUST on R2 (T1) ══
[NETCONF] ✓ Config pushed to R2 (MED_ADJUST) in 91ms
[SLA] latency 24.0→22.9ms loss 0.120→0.110% reward=+0.391 rollback=False
[Prod] ✓ Action applied successfully — reward=+0.391
[Prod] ══ Execute #3: AS_PATH_PREPEND on R3 (T2) ══
[NETCONF] ✓ Config pushed to R3 (AS_PATH_PREPEND) in 103ms
[SLA] latency 24.0→25.6ms loss 0.120→0.240% reward=-0.284 rollback=False
[Prod] ✓ Action applied successfully — reward=-0.284
[Prod] ══ Execute #4: COMMUNITY_TAG on R1 (T1) ══
[NETCONF] ✓ Config pushed to R1 (COMMUNITY_TAG) in 74ms
[SLA] latency 24.0→23.8ms loss 0.120→0.118% reward=+0.211 rollback=False
[Prod] ✓ Action applied successfully — reward=+0.211
[Prod] ══ Execute #5: ISP_FAILOVER on R2 (T3) ══
[NETCONF] ✓ Config pushed to R2 (ISP_FAILOVER) in 116ms
[SLA] latency 24.0→36.5ms loss 0.120→0.540% reward=-1.774 rollback=True
[NETCONF] ↺ ROLLBACK applied on R2
[Prod] Auto-rollback executed for action ACT-9821
[Prod] ══ Execute #6: NO_ACTION on R3 (T1) ══
[NETCONF] ✓ Config pushed to R3 (NO_ACTION) in 55ms
[SLA] latency 24.0→24.1ms loss 0.120→0.121% reward=+0.198 rollback=False
[Prod] ✓ Action applied successfully — reward=+0.198
[Learning] Starting retraining experiment — avg_reward=0.2263 agreement=83.3%
[Learning] ✗ Model v1.1 not promoted
[MLflow] run_id=run-001 version=v1.1 reward=0.2263 promoted=False

═══════════════════════════════════════════════════════
 PRODUCTION OPERATIONAL REPORT
═══════════════════════════════════════════════════════
 Total executions    : 6
 Successfully applied: 5
 Push failures       : 0
 Auto-rolled back    : 1
 Avg reward          : +0.2263
 MLflow Experiment Runs:
   [run-001] v1.1     reward=+0.2263   staged
═══════════════════════════════════════════════════════

 

AGENTIC AI BASED BGP PATH MANIPULATION END-TO-END PROCESS

 

Summary:

The Architecture above Express For Agentic AI A New Paradigm For A Intelligent Autonomous Self-Healing Network Optimisation Framework Bringing Traditional BGP Operations Into Enterprise & Service Provider Environments. BGP path manipulation is the heart of most traffic engineering, redundancy strategies, load balancing, disaster recovery and SLA assurance in modern production networks via Local Preference, AS Path Prepending, MED or Weight or even Community values. But manual BGP policy enforcement under congestion, route flapping, or fiber failure loops to create implicated routing problem are asymmetric routes that might lead to potential packet loss and permanent service interruptions. The solution therefore provides the introduction of an AI-based autonomous networking system, that is able to continuously assess streaming telemetry, BGP updates, network topology and historical incident information in making near real-time optimized routing decisions with minimal human intervention.

The Enterprise use-case are as below in which Brayan a Network Optimization Engineer at Wipro Telecom is facing the traffic congestion on one path and due to an OFC cut another segment service degradation. But, engineering teams typically manually assess the traffic flow and/or simulate path changes based on a pre-defined routing policy introduced increasing operational risk and incident response time. Using Agentic AI, it knows when there is congestion and instability based on real-time telemetry, uses Digital Twin simulation to test different paths and automatically manipulates key BGP attributes such as Local Preference, MEDs, AS Path Prepending and Community Values to intelligently reroute traffic. This allows for quicker convergence, distributing traffic, a high availability system, less downtime, proactive SLA protection that reduces the blast radius of failure.

Below is a sample example of our network topology with R1 and R2 edge routers connecting to multiple ISPs (Jio-ISP-A, Idea-ISP-B, Vodafone-ISP-C), where R3 works as Route Reflector; Also, we have placed Different Data centre spine devices here — R4/R5. Phase 1 — The Telemetry & Observability Foundation The AI workflow begins here with the collection of streaming telemetry from routers utilizing simulated gNMI and OpenConfig YANG models Using a BMP collector, the system listens for BGP route advertisements, peer state changes, CPU usage, routing health and protocol statistics. Telemetry is streamed directly through a Kafka-style-like event bus and stored in an observability-ready InfluxDB-style time-series database to enable troubleshooting, anomaly detection, and predictive analytics. Feature Extraction Pipelines — These pipelines are responsible for transforming raw telemetry into machine-learning ready datasets for any model targeting anomalies, root-cause analysis, predictive failure detection and autonomous remediation. Scalable telemetry collection for intelligent network automation in real production deployments enabled by pyGNMI, GoBMP, Confluent Kafka and InfluxDB.

It basically is a governance and safety control layer for autonomous BGP operation. It validates every AI-enabled routing action against network policies, operational guardrails & SLA requirements before they are rolled out in production. Enforces POLA-related constraints such as: LRO-filtered LOCAL_PREF ranges, no-export community protection, limited routing transient count per minute, blast-radius limits based on the location's autonomy tier and automation-only minimum confidence bounds. After a deployment change, the engine monitors network health continuously and automatically initiates rollback procedures when predefined thresholds based on latency, packet loss or stability are surpassed. Even more interesting, all of these AI actions — every decision, approval, execution and rollback — is logged in a comprehensive audit log that provides operational transparency that assists with compliance and traceability between enterprise and provider environments.