Experiência 5: Análise e Visualização de Dados
Experiência 5: Análise e Visualização de Dados Clínicos
Objetivo
Capacitar os estudantes na construção de dashboards interativos e sistemas de análise para dados biomédicos, utilizando ferramentas modernas de Business Intelligence e visualização de dados. A experiência enfatiza a transformação de dados clínicos em insights acionáveis para apoio à decisão médica.
Objetivos de Aprendizagem
- Construir dashboards interativos com dados clínicos em tempo real
- Implementar KPIs (Key Performance Indicators) clínicos relevantes
- Desenvolver sistemas de alertas automáticos baseados em regras
- Aplicar técnicas de análise exploratória de dados (EDA)
- Integrar múltiplas fontes de dados de saúde
- Avaliar usabilidade e eficácia de interfaces de visualização
Resultados de Aprendizagem
Ao completar esta experiência, os estudantes serão capazes de:
- Conectar ferramentas de BI a servidores FHIR e bases de dados clínicas
- Criar visualizações eficazes para sinais vitais e tendências
- Implementar alertas automáticos para valores críticos
- Desenvolver mapas de calor e análises geoespaciais
- Construir relatórios executivos para gestão hospitalar
- Implementar dashboards em tempo real com refresh automático
Exercício Prático Detalhado
Fase 1: Conexão e Preparação de Dados (45 min)
Configuração de fontes de dados múltiplas:
import pandas as pd
import requests
import sqlite3
from sqlalchemy import create_engine
import json
class HealthDataConnector:
def __init__(self, fhir_url, redcap_token, db_connection):
self.fhir_url = fhir_url
self.redcap_token = redcap_token
self.db_engine = create_engine(db_connection)
def fetch_fhir_observations(self, date_range=None, patient_ids=None):
"""Buscar observações do servidor FHIR"""
params = {"_count": 1000, "_sort": "-date"}
if date_range:
params["date"] = f"ge{date_range['start']}&date=le{date_range['end']}"
if patient_ids:
params["subject"] = ",".join([f"Patient/{pid}" for pid in patient_ids])
response = requests.get(f"{self.fhir_url}/Observation", params=params)
observations = []
for entry in response.json().get('entry', []):
obs = entry['resource']
# Extrair dados estruturados
observation_data = {
'id': obs.get('id'),
'patient_id': obs.get('subject', {}).get('reference', '').replace('Patient/', ''),
'datetime': obs.get('effectiveDateTime'),
'code': obs.get('code', {}).get('coding', [{}])[0].get('code'),
'display': obs.get('code', {}).get('coding', [{}])[0].get('display'),
'value': obs.get('valueQuantity', {}).get('value'),
'unit': obs.get('valueQuantity', {}).get('unit'),
'status': obs.get('status'),
'device_id': obs.get('device', {}).get('reference', '').replace('Device/', '')
}
observations.append(observation_data)
return pd.DataFrame(observations)
def fetch_redcap_data(self, forms=None):
"""Buscar dados estruturados do RedCap"""
payload = {
'token': self.redcap_token,
'content': 'record',
'action': 'export',
'format': 'json',
'type': 'flat'
}
if forms:
payload['forms'] = ','.join(forms)
response = requests.post("https://redcap.med.up.pt/api/", data=payload)
return pd.DataFrame(response.json())
def create_unified_dataset(self):
"""Criar dataset unificado com dados de todas as fontes"""
# Buscar dados das últimas 24 horas
from datetime import datetime, timedelta
end_date = datetime.now()
start_date = end_date - timedelta(days=1)
# FHIR observations
fhir_data = self.fetch_fhir_observations({
'start': start_date.strftime('%Y-%m-%d'),
'end': end_date.strftime('%Y-%m-%d')
})
# RedCap demographic data
redcap_data = self.fetch_redcap_data(['demographics', 'vitals'])
# Merge datasets por patient_id
if not fhir_data.empty and not redcap_data.empty:
unified = pd.merge(
fhir_data,
redcap_data,
left_on='patient_id',
right_on='patient_id',
how='left'
)
else:
unified = fhir_data
# Adicionar colunas calculadas
unified['datetime'] = pd.to_datetime(unified['datetime'])
unified['hour'] = unified['datetime'].dt.hour
unified['day_of_week'] = unified['datetime'].dt.day_name()
# Calcular alertas
unified['alert_level'] = unified.apply(self.calculate_alert_level, axis=1)
return unified
def calculate_alert_level(self, row):
"""Calcular nível de alerta baseado nos valores"""
if row['code'] == '8867-4': # Heart rate
if row['value'] < 50 or row['value'] > 120:
return 'critical'
elif row['value'] < 60 or row['value'] > 100:
return 'warning'
else:
return 'normal'
elif row['code'] == '8310-5': # Temperature
if row['value'] < 35.0 or row['value'] > 40.0:
return 'critical'
elif row['value'] < 36.0 or row['value'] > 38.0:
return 'warning'
else:
return 'normal'
return 'normal'
# Configurar conexão
connector = HealthDataConnector(
fhir_url="http://localhost:8080/fhir",
redcap_token="YOUR_REDCAP_TOKEN",
db_connection="sqlite:///health_data.db"
)
# Criar dataset unificado
unified_data = connector.create_unified_dataset()
unified_data.to_sql('unified_vitals', connector.db_engine, if_exists='replace')Fase 2: Dashboard PowerBI/Grafana (60 min)
Template PowerBI para sinais vitais:
{
"dashboard_config": {
"title": "Monitorização de Sinais Vitais - Tempo Real",
"refresh_interval": "30s",
"data_sources": [
{
"name": "FHIR_Server",
"type": "REST_API",
"url": "http://localhost:8080/fhir/Observation",
"auth": "none",
"refresh": "30s"
},
{
"name": "SQLite_Unified",
"type": "SQLite",
"connection": "sqlite:///health_data.db",
"table": "unified_vitals"
}
],
"panels": [
{
"title": "Sinais Vitais em Tempo Real",
"type": "time_series",
"query": "SELECT datetime, value, display FROM unified_vitals WHERE code IN ('8867-4', '8310-5') ORDER BY datetime DESC LIMIT 100",
"visualization": {
"type": "line_chart",
"x_axis": "datetime",
"y_axis": "value",
"series": "display",
"colors": {"Heart rate": "#ff6b6b", "Body temperature": "#4ecdc4"}
}
},
{
"title": "Distribuição de Alertas",
"type": "pie_chart",
"query": "SELECT alert_level, COUNT(*) as count FROM unified_vitals GROUP BY alert_level",
"colors": {"normal": "#51cf66", "warning": "#ffd43b", "critical": "#ff6b6b"}
},
{
"title": "Pacientes com Alertas Críticos",
"type": "table",
"query": "SELECT patient_id, display, value, unit, datetime FROM unified_vitals WHERE alert_level = 'critical' ORDER BY datetime DESC LIMIT 10"
},
{
"title": "Mapa de Calor - Alertas por Hora",
"type": "heatmap",
"query": "SELECT hour, day_of_week, COUNT(*) as alerts FROM unified_vitals WHERE alert_level != 'normal' GROUP BY hour, day_of_week"
}
]
}
}Implementação Grafana com Docker:
# docker-compose-grafana.yml
version: '3.7'
services:
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS=fhir-datasource
volumes:
- grafana-storage:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
- ./grafana/dashboards:/var/lib/grafana/dashboards
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
volumes:
grafana-storage:Dashboard Grafana configurado via JSON:
{
"dashboard": {
"title": "Laboratório Saúde Digital - Monitorização",
"tags": ["health", "monitoring", "fhir"],
"timezone": "Europe/Lisbon",
"refresh": "30s",
"time": {
"from": "now-24h",
"to": "now"
},
"panels": [
{
"id": 1,
"title": "Frequência Cardíaca - Últimas 24h",
"type": "timeseries",
"targets": [
{
"expr": "SELECT datetime as time, value FROM unified_vitals WHERE code = '8867-4' AND datetime >= NOW() - INTERVAL 24 HOUR ORDER BY datetime",
"format": "time_series",
"intervalFactor": 1
}
],
"fieldConfig": {
"defaults": {
"color": {"mode": "palette-classic"},
"unit": "bpm",
"min": 40,
"max": 150,
"thresholds": {
"steps": [
{"color": "green", "value": 60},
{"color": "yellow", "value": 100},
{"color": "red", "value": 120}
]
}
}
}
},
{
"id": 2,
"title": "Alertas Críticos Ativos",
"type": "stat",
"targets": [
{
"expr": "SELECT COUNT(*) FROM unified_vitals WHERE alert_level = 'critical' AND datetime >= NOW() - INTERVAL 1 HOUR"
}
],
"fieldConfig": {
"defaults": {
"color": {"mode": "thresholds"},
"thresholds": {
"steps": [
{"color": "green", "value": 0},
{"color": "yellow", "value": 1},
{"color": "red", "value": 5}
]
}
}
}
},
{
"id": 3,
"title": "Distribuição de Pacientes por Unidade",
"type": "piechart",
"targets": [
{
"expr": "SELECT unit, COUNT(DISTINCT patient_id) as patients FROM unified_vitals uf JOIN redcap_data rd ON uf.patient_id = rd.patient_id GROUP BY unit"
}
]
}
]
}
}Fase 3: Análise Exploratória Avançada (45 min)
Notebook Jupyter para análise de padrões:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
class HealthDataAnalyzer:
def __init__(self, data):
self.data = data
def perform_eda(self):
"""Análise exploratória completa"""
# 1. Estatísticas descritivas por tipo de sinal
vital_stats = self.data.groupby(['code', 'display'])['value'].agg([
'count', 'mean', 'std', 'min', 'max', 'median'
]).round(2)
print("Estatísticas Descritivas por Sinal Vital:")
print(vital_stats)
# 2. Análise temporal
self.analyze_temporal_patterns()
# 3. Análise de correlações
self.analyze_correlations()
# 4. Detecção de outliers
self.detect_outliers()
# 5. Análise de qualidade dos dados
self.analyze_data_quality()
def analyze_temporal_patterns(self):
"""Analisar padrões temporais nos sinais vitais"""
# Preparar dados para análise temporal
hr_data = self.data[self.data['code'] == '8867-4'].copy()
hr_data['datetime'] = pd.to_datetime(hr_data['datetime'])
hr_data = hr_data.set_index('datetime').sort_index()
# Resample para médias horárias
hourly_hr = hr_data['value'].resample('H').mean()
# Criar visualização interativa
fig = make_subplots(
rows=3, cols=1,
subplot_titles=['Frequência Cardíaca - Série Temporal',
'Padrão Circadiano',
'Distribuição por Dia da Semana'],
vertical_spacing=0.1
)
# Série temporal completa
fig.add_trace(
go.Scatter(x=hourly_hr.index, y=hourly_hr.values,
mode='lines+markers', name='FC Média Horária'),
row=1, col=1
)
# Padrão circadiano (média por hora do dia)
circadian = hr_data.groupby(hr_data.index.hour)['value'].mean()
fig.add_trace(
go.Scatter(x=circadian.index, y=circadian.values,
mode='lines+markers', name='Padrão Circadiano'),
row=2, col=1
)
# Distribuição por dia da semana
weekly = hr_data.groupby(hr_data.index.day_name())['value'].mean()
fig.add_trace(
go.Bar(x=weekly.index, y=weekly.values, name='FC por Dia'),
row=3, col=1
)
fig.update_layout(height=800, title_text="Análise Temporal - Frequência Cardíaca")
fig.show()
def analyze_correlations(self):
"""Analisar correlações entre diferentes sinais vitais"""
# Pivot dos dados para ter sinais vitais como colunas
pivot_data = self.data.pivot_table(
index=['patient_id', 'datetime'],
columns='code',
values='value'
).reset_index()
# Calcular correlações
correlation_matrix = pivot_data.select_dtypes(include=[np.number]).corr()
# Heatmap de correlações
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0,
square=True, fmt='.2f')
plt.title('Matriz de Correlação - Sinais Vitais')
plt.show()
# Análise de correlação entre FC e temperatura
if '8867-4' in pivot_data.columns and '8310-5' in pivot_data.columns:
hr_temp_corr = stats.pearsonr(
pivot_data['8867-4'].dropna(),
pivot_data['8310-5'].dropna()
)
print(f"Correlação FC vs Temperatura: r={hr_temp_corr[0]:.3f}, p={hr_temp_corr[1]:.3f}")
def detect_outliers(self):
"""Detectar outliers usando múltiplos métodos"""
outliers_report = {}
for vital_code in self.data['code'].unique():
vital_data = self.data[self.data['code'] == vital_code]['value']
# Método IQR
Q1 = vital_data.quantile(0.25)
Q3 = vital_data.quantile(0.75)
IQR = Q3 - Q1
iqr_outliers = vital_data[(vital_data < Q1 - 1.5*IQR) | (vital_data > Q3 + 1.5*IQR)]
# Método Z-score
z_scores = np.abs(stats.zscore(vital_data.dropna()))
zscore_outliers = vital_data[z_scores > 3]
# Método clínico (valores fisiologicamente implausíveis)
clinical_outliers = self.get_clinical_outliers(vital_code, vital_data)
outliers_report[vital_code] = {
'total_observations': len(vital_data),
'iqr_outliers': len(iqr_outliers),
'zscore_outliers': len(zscore_outliers),
'clinical_outliers': len(clinical_outliers),
'outlier_percentage': (len(iqr_outliers) / len(vital_data)) * 100
}
return outliers_report
def get_clinical_outliers(self, vital_code, data):
"""Identificar outliers baseados em limites clínicos"""
clinical_ranges = {
'8867-4': (30, 220), # Heart rate: 30-220 bpm
'8310-5': (30.0, 45.0), # Temperature: 30-45°C
'59408-5': (50, 100) # SpO2: 50-100%
}
if vital_code in clinical_ranges:
min_val, max_val = clinical_ranges[vital_code]
return data[(data < min_val) | (data > max_val)]
return pd.Series()
def analyze_data_quality(self):
"""Analisar qualidade dos dados"""
quality_report = {
'total_records': len(self.data),
'missing_values': self.data.isnull().sum().to_dict(),
'duplicate_records': self.data.duplicated().sum(),
'data_types': self.data.dtypes.to_dict(),
'unique_patients': self.data['patient_id'].nunique(),
'date_range': {
'start': self.data['datetime'].min(),
'end': self.data['datetime'].max()
}
}
# Calcular completude por paciente
patient_completeness = self.data.groupby('patient_id').agg({
'code': 'nunique', # Quantos tipos diferentes de sinais vitais
'value': 'count' # Total de observações
})
quality_report['patient_completeness'] = {
'mean_vitals_per_patient': patient_completeness['code'].mean(),
'mean_observations_per_patient': patient_completeness['value'].mean()
}
return quality_report
# Executar análise
analyzer = HealthDataAnalyzer(unified_data)
analyzer.perform_eda()Fase 4: Sistema de Alertas em Tempo Real (30 min)
Sistema de notificações automáticas:
import smtplib
import asyncio
import websockets
import json
from email.mime.text import MIMEText
from datetime import datetime, timedelta
class RealTimeAlertSystem:
def __init__(self, fhir_url, notification_config):
self.fhir_url = fhir_url
self.notification_config = notification_config
self.active_alerts = {}
async def monitor_continuous(self):
"""Monitorização contínua com WebSocket"""
# Configurar WebSocket para updates em tempo real
uri = f"ws://localhost:8080/fhir/$websocket"
try:
async with websockets.connect(uri) as websocket:
# Subscription para observações críticas
subscription = {
"resourceType": "Subscription",
"status": "active",
"criteria": "Observation?code=8867-4,8310-5",
"channel": {
"type": "websocket",
"endpoint": uri
}
}
await websocket.send(json.dumps(subscription))
# Loop de monitorização
while True:
message = await websocket.recv()
observation = json.loads(message)
# Avaliar se requer alerta
alert_level = self.evaluate_observation(observation)
if alert_level in ['warning', 'critical']:
await self.trigger_alert(observation, alert_level)
except Exception as e:
print(f"Erro na monitorização: {e}")
# Fallback para polling
await self.monitor_polling()
def evaluate_observation(self, observation):
"""Avaliar observação e determinar nível de alerta"""
try:
code = observation.get('code', {}).get('coding', [{}])[0].get('code')
value = observation.get('valueQuantity', {}).get('value')
patient_id = observation.get('subject', {}).get('reference', '').replace('Patient/', '')
# Regras de alerta por tipo de sinal vital
if code == '8867-4': # Heart rate
if value < 40 or value > 150:
return 'critical'
elif value < 50 or value > 120:
return 'warning'
elif code == '8310-5': # Temperature
if value < 34.0 or value > 41.0:
return 'critical'
elif value < 35.5 or value > 38.5:
return 'warning'
# Verificar tendências (multiple observations)
trend_alert = self.check_trend_alerts(patient_id, code, value)
if trend_alert:
return trend_alert
except Exception as e:
print(f"Erro na avaliação: {e}")
return 'normal'
def check_trend_alerts(self, patient_id, code, current_value):
"""Verificar alertas baseados em tendências"""
# Buscar últimas 5 observações do mesmo tipo
params = {
"subject": f"Patient/{patient_id}",
"code": code,
"_count": 5,
"_sort": "-date"
}
response = requests.get(f"{self.fhir_url}/Observation", params=params)
recent_obs = response.json().get('entry', [])
if len(recent_obs) >= 3:
values = [obs['resource'].get('valueQuantity', {}).get('value')
for obs in recent_obs]
values = [v for v in values if v is not None]
if len(values) >= 3:
# Tendência crescente preocupante
if code == '8867-4' and all(values[i] > values[i+1] for i in range(len(values)-1)):
if values[0] - values[-1] > 20: # Aumento >20 bpm
return 'warning'
# Variabilidade excessiva
if np.std(values) > self.get_variability_threshold(code):
return 'warning'
return None
async def trigger_alert(self, observation, alert_level):
"""Disparar alerta através de múltiplos canais"""
alert_id = f"{observation.get('id')}_{datetime.now().strftime('%Y%m%d%H%M%S')}"
# Verificar se não é duplicado (throttling)
if alert_id in self.active_alerts:
return
alert_data = {
'id': alert_id,
'level': alert_level,
'patient_id': observation.get('subject', {}).get('reference', ''),
'vital_sign': observation.get('code', {}).get('coding', [{}])[0].get('display'),
'value': observation.get('valueQuantity', {}).get('value'),
'unit': observation.get('valueQuantity', {}).get('unit'),
'timestamp': datetime.now(),
'observation_id': observation.get('id')
}
self.active_alerts[alert_id] = alert_data
# Enviar notificações
await self.send_email_alert(alert_data)
await self.send_dashboard_notification(alert_data)
await self.log_alert(alert_data)
# Auto-resolve após 1 hora
asyncio.create_task(self.auto_resolve_alert(alert_id, 3600))
async def send_email_alert(self, alert_data):
"""Enviar alerta por email"""
if alert_data['level'] == 'critical':
subject = f"ALERTA CRÍTICO - {alert_data['vital_sign']}"
recipients = self.notification_config['critical_emails']
else:
subject = f"Alerta - {alert_data['vital_sign']}"
recipients = self.notification_config['warning_emails']
body = f"""
ALERTA DE MONITORIZAÇÃO - LAB SAÚDE DIGITAL
Paciente: {alert_data['patient_id']}
Sinal Vital: {alert_data['vital_sign']}
Valor: {alert_data['value']} {alert_data['unit']}
Nível: {alert_data['level'].upper()}
Timestamp: {alert_data['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}
Por favor, verificar imediatamente o paciente.
Dashboard: http://grafana.labsaude.up.pt/dashboard/vitals
"""
try:
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.notification_config['smtp_from']
msg['To'] = ', '.join(recipients)
server = smtplib.SMTP(self.notification_config['smtp_server'], 587)
server.starttls()
server.login(self.notification_config['smtp_user'],
self.notification_config['smtp_pass'])
server.send_message(msg)
server.quit()
except Exception as e:
print(f"Erro ao enviar email: {e}")
async def send_dashboard_notification(self, alert_data):
"""Enviar notificação para dashboard em tempo real"""
# WebSocket para dashboard
notification = {
'type': 'alert',
'data': alert_data,
'timestamp': alert_data['timestamp'].isoformat()
}
# Enviar para todos os clientes conectados
for client in self.connected_dashboards:
try:
await client.send(json.dumps(notification))
except:
pass # Cliente desconectado
# Configurar e iniciar sistema de alertas
alert_config = {
'smtp_server': 'smtp.up.pt',
'smtp_user': 'labsaude@up.pt',
'smtp_pass': 'password',
'smtp_from': 'labsaude@up.pt',
'critical_emails': ['medico.urgencia@up.pt', 'enfermeiro.chefe@up.pt'],
'warning_emails': ['enfermeiro@up.pt']
}
alert_system = RealTimeAlertSystem(
fhir_url="http://localhost:8080/fhir",
notification_config=alert_config
)
# Iniciar monitorização (em produção seria um serviço separado)
# asyncio.run(alert_system.monitor_continuous())Recursos Necessários
Ferramentas de Visualização
- Power BI Desktop ou Grafana (open source)
- Apache Superset (alternativa open source)
- Tableau Public (para prototipagem)
- Jupyter Notebook com plotly, matplotlib, seaborn
Infraestrutura de Dados
- Base de dados (PostgreSQL, SQLite para testes)
- Data warehouse (opcional: ClickHouse, BigQuery)
- Message broker (Redis, RabbitMQ) para alertas
- WebSocket server para updates em tempo real
Conectores e APIs
- ODBC/JDBC drivers para bases de dados
- REST API clients para FHIR
- Python libraries: pandas, sqlalchemy, requests
- Real-time streaming: Apache Kafka (avançado)
Metodologias de Ensino
Sessão Teórica (30 min)
- Princípios de visualização eficaz em saúde
- KPIs clínicos e métricas de qualidade
- Design thinking para dashboards médicos
- Considerações de usabilidade para profissionais de saúde
Sessão Prática (150 min)
- Configuração de fontes (30 min): Conectar múltiplas bases de dados
- Criação de dashboards (60 min): Desenvolvimento visual
- Análise exploratória (30 min): Padrões e insights
- Sistema de alertas (30 min): Configuração de notificações
Trabalho Autónomo (90 min)
- Personalizar dashboards para diferentes perfis de utilizador
- Implementar métricas KPI adicionais
- Criar relatórios automáticos executivos
- Desenvolver análises preditivas básicas
Avaliação
Critérios de Avaliação (100 pontos)
- Qualidade das visualizações (25 pts): Clareza, adequação, design
- Funcionalidade técnica (25 pts): Conectividade, performance, atualizações
- Insights extraídos (20 pts): Análise significativa dos dados
- Sistema de alertas (20 pts): Eficácia e configuração adequada
- Apresentação (10 pts): Demonstração e documentação
Entregáveis
- Dashboard completo com múltiplas visualizações
- Notebook de análise com insights documentados
- Sistema de alertas configurado e testado
- Manual de utilizador para diferentes perfis
- Relatório executivo com recomendações
Extensões Avançadas
Machine Learning Integration
- Modelos preditivos para deterioração clínica
- Clustering de pacientes por padrões
- Anomaly detection automática
- Forecasting de demanda de recursos
Advanced Analytics
- Cohort analysis para estudos longitudinais
- Survival analysis com curvas de Kaplan-Meier
- A/B testing para intervenções clínicas
- Time series forecasting com ARIMA/Prophet
Mobile e Wearables
- Apps móveis para profissionais
- Push notifications para alertas críticos
- Wearable data integration (smartwatches)
- Offline capability para áreas sem conectividade
Integração com Pipeline
Input das Experiências Anteriores
- Dados unificados das integrações FHIR (Experiência 4)
- Dados estruturados do RedCap (Experiência 3)
- Métricas processadas dos sinais (Experiência 2)
- Dados brutos dos sensores (Experiência 1)
Output para Experiência Final
- KPIs em tempo real para monitorização IoMT
- Alertas configurados para integração com dispositivos
- Dashboards otimizados para dados de sensores
- Pipelines analíticos prontos para escala
Preparação para Produção
- Performance benchmarks estabelecidos
- User acceptance testing completo
- Documentation para manutenção
- Backup e disaster recovery configurados