Alright, let's put the concepts from this chapter into practice. We'll set up a basic monitoring pipeline using MLflow for tracking metrics and Grafana for visualizing them in near real-time. This exercise demonstrates how these tools can work together, with MLflow providing detailed logging for analysis and model lineage, while Grafana offers operational dashboarding capabilities often powered by a time-series database like Prometheus.
For this practice, we assume you have a working Python environment with mlflow
and prometheus_client
installed (pip install mlflow prometheus_client Flask
), and Docker installed for running Prometheus and Grafana.
We'll simulate a simple prediction service. For each prediction request, we want to:
First, let's create a simple Python script (prediction_service.py
) that simulates making predictions and logs metrics using both MLflow and the Prometheus client library. We'll use Flask to create a minimal web endpoint that Prometheus can scrape.
import mlflow
import time
import random
from flask import Flask, Response
from prometheus_client import Gauge, CollectorRegistry, generate_latest
# --- Configuration ---
MLFLOW_TRACKING_URI = "http://127.0.0.1:5000" # Default local MLflow tracking server URI
EXPERIMENT_NAME = "Production Monitoring Simulation"
SERVICE_PORT = 8080 # Port for the Flask app exposing Prometheus metrics
# --- MLflow Setup ---
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
mlflow.set_experiment(EXPERIMENT_NAME)
# Ensure the experiment exists
try:
client = mlflow.tracking.MlflowClient()
experiment = client.get_experiment_by_name(EXPERIMENT_NAME)
if experiment is None:
client.create_experiment(EXPERIMENT_NAME)
experiment_id = client.get_experiment_by_name(EXPERIMENT_NAME).experiment_id
except Exception as e:
print(f"Could not connect to MLflow tracking server at {MLFLOW_TRACKING_URI} or create experiment. Please ensure it's running. Error: {e}")
experiment_id = None # Handle case where MLflow server isn't running
# --- Prometheus Setup ---
# Use a registry to manage metrics
prometheus_registry = CollectorRegistry()
# Define a Gauge metric to track the average prediction value
avg_prediction_gauge = Gauge(
'average_prediction_value',
'Average value of predictions made by the service',
registry=prometheus_registry
)
# --- Flask App for Prometheus Metrics ---
app = Flask(__name__)
@app.route('/metrics')
def metrics():
""" Exposes Prometheus metrics. """
return Response(generate_latest(prometheus_registry), mimetype='text/plain')
# --- Simulation Logic ---
def simulate_predictions():
""" Simulates making predictions and logging metrics. """
print(f"Starting prediction simulation. Logging to MLflow experiment: '{EXPERIMENT_NAME}'")
prediction_values = []
max_history = 50 # Keep a rolling window for average calculation
with mlflow.start_run(experiment_id=experiment_id, run_name="Simulated Production Run") as run:
print(f"MLflow Run ID: {run.info.run_id}")
mlflow.log_param("simulation_type", "average_prediction_monitoring")
step = 0
while True:
# Simulate a new prediction (e.g., score, probability, regression output)
# Let's simulate a value drifting over time
base_value = 50
drift = step * 0.1 # Gradual drift upwards
noise = random.gauss(0, 5) # Add some noise
prediction = base_value + drift + noise
prediction = max(0, min(100, prediction)) # Clamp between 0 and 100
# Log individual prediction to MLflow
mlflow.log_metric("prediction_value", prediction, step=step)
# Update rolling list for average calculation
prediction_values.append(prediction)
if len(prediction_values) > max_history:
prediction_values.pop(0)
# Calculate and update Prometheus Gauge
if prediction_values:
current_avg = sum(prediction_values) / len(prediction_values)
avg_prediction_gauge.set(current_avg)
# Optionally, log the average to MLflow as well
mlflow.log_metric("average_prediction_value_gauge", current_avg, step=step)
print(f"Step {step}: Prediction={prediction:.2f}, Current Avg={current_avg:.2f}")
step += 1
time.sleep(5) # Wait 5 seconds between simulated predictions
if __name__ == '__main__':
# Start Flask app in a separate thread (or process) for /metrics endpoint
# For simplicity in this example, we might run it separately or integrate.
# Here we focus on the simulation loop. Run Flask separately.
# Example: Run 'flask --app prediction_service run --port 8080' in another terminal
# Or integrate threading:
# from threading import Thread
# metrics_thread = Thread(target=lambda: app.run(host='0.0.0.0', port=SERVICE_PORT, debug=False), daemon=True)
# metrics_thread.start()
# Run the simulation
simulate_predictions()
Before running:
mlflow ui
This usually starts the server at http://127.0.0.1:5000
./metrics
endpoint. The simplest way for this example is to run the Flask development server pointing to the app
object in our script. Save the script as prediction_service.py
. In one terminal, run the simulation:
python prediction_service.py
In another terminal, run the Flask server to expose metrics:
flask --app prediction_service run --port 8080
(Ensure Flask is installed: pip install Flask
). You should now be able to access http://localhost:8080/metrics
and see the average_prediction_value
metric.Now, we need Prometheus to scrape the /metrics
endpoint. Create a simple prometheus.yml
configuration file:
# prometheus.yml
global:
scrape_interval: 10s # Scrape targets every 10 seconds
scrape_configs:
- job_name: 'prediction_service'
static_configs:
- targets: ['host.docker.internal:8080'] # For Docker on Mac/Windows
# If running Docker on Linux, use your host machine's IP instead of host.docker.internal
# Example: - targets: ['172.17.0.1:8080'] (Find your Docker bridge IP if needed)
# Or if running Prometheus directly on host: - targets: ['localhost:8080']
Note on host.docker.internal
: This special DNS name resolves to the host machine's IP address from within a Docker container on Docker Desktop (Mac/Windows). If you are using Docker on Linux, you might need to use the host's IP address on the Docker bridge network (often 172.17.0.1
) or configure networking differently. If running Prometheus directly on the host (not in Docker), use localhost:8080
.
Run Prometheus using Docker, mounting your configuration file:
docker run -d --name prometheus \
-p 9090:9090 \
-v $(pwd)/prometheus.yml:/etc/prometheus/prometheus.yml \
prom/prometheus
You should now be able to access the Prometheus UI at http://localhost:9090
. Check the "Targets" page to confirm Prometheus is successfully scraping your prediction_service
endpoint. You can also query the metric average_prediction_value
in the Prometheus query browser.
Run Grafana using Docker:
docker run -d --name grafana \
-p 3000:3000 \
grafana/grafana-oss
Access Grafana at http://localhost:3000
(default login is admin/admin, you'll be prompted to change the password).
http://prometheus:9090
. If Grafana is on the host and Prometheus is in Docker, use http://localhost:9090
.average_prediction_value
.You should now see a graph visualizing the average prediction value, updating roughly every 10 seconds (based on the Prometheus scrape interval), reflecting the drift we introduced in the simulation.
Visualization of the
average_prediction_value
metric as seen in a Grafana panel, showing an upward trend over time due to simulated drift.
This exercise provides a foundational setup. We used:
prediction_value
) and potentially configuration parameters or aggregated metrics (average_prediction_value_gauge
). This creates a detailed historical record tied to specific runs, invaluable for debugging, auditing, and retraining analysis. You can explore these runs in the MLflow UI.average_prediction_value
). Prometheus is designed for efficient time-series data storage and querying, making it suitable for high-frequency monitoring.In a more complex production scenario:
This hands-on example illustrates how distinct tools can be combined to build a layered monitoring strategy, addressing both immediate operational health and longer-term model performance analysis.
© 2025 ApX Machine Learning