Já teve a vontade incontrolável de visualizar o tráfego da sua rede em tempo real, como se fosse um hacker de filme hollywoodiano? 🤔 Neste tutorial, vamos embarcar na aventura de construir um dashboard interativo de análise de tráfego de rede com Python e Streamlit. Sim, você leu certo, Streamlit! Essa belezinha open-source é um framework Python que permite desenvolver aplicações web para análise e processamento de dados, sem precisar vender a alma para o diabo. 😈
Ao final deste tutorial, você será capaz de capturar pacotes de rede crus da sua placa de rede (NIC), processá-los e criar visualizações incríveis que se atualizam em tempo real, tipo aqueles painéis futurísticos de filmes sci-fi. Prepare-se para impressionar os amigos! 😎
Por Que a Análise de Tráfego de Rede é Tão Importante? (além de ser incrivelmente legal)
A análise de tráfego de rede é crucial em empresas onde as redes são a espinha dorsal de praticamente tudo. Basicamente, analisamos os pacotes de rede, monitoramos o tráfego (entradas e saídas) e interpretamos esses pacotes enquanto eles fluem pela rede. É como bisbilhotar a conversa dos dados, mas com um propósito nobre. 🕵️♀️
Com essa técnica, podemos identificar padrões de segurança, detectar anomalias e garantir que a rede funcione como um relógio suíço. ⌚ Este projeto que vamos construir é especialmente útil porque permite visualizar e analisar a atividade da rede em tempo real. Imagine só: você poderá solucionar problemas, otimizar o desempenho e analisar a segurança como um verdadeiro profissional! 👨💻
Pré-requisitos (ou o que você precisa ter para não se perder no caminho)
- Python 3.8 ou superior instalado (a cobra precisa estar viva e pronta para o ataque!).
- Conhecimento básico de redes de computadores (não precisa ser um guru, mas saber o básico ajuda).
- Familiaridade com a linguagem Python e suas bibliotecas (pandas, matplotlib, etc.).
- Noções básicas de visualização de dados (gráficos, dashboards, essas coisas bonitas).
Quer saber mais sobre STREAMLIT?
Compre meu Livro Disponível na amazon: https://www.amazon.com.br/dp/B0CW1BH7NL
Como Configurar o Projeto (preparando o terreno para a aventura)
Crie a estrutura do projeto e instale as ferramentas necessárias com o Pip:
Bash
mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly
Usaremos o Streamlit para as visualizações do dashboard (a cereja do bolo), Pandas para processar os dados (o mestre da organização), Scapy para capturar e processar os pacotes (o espião da rede) e, finalmente, Plotly para plotar gráficos com os dados coletados (o artista visual). 🎨
Como Construir as Funcionalidades Principais (a mágica acontece aqui)
Colocaremos todo o código em um único arquivo chamado dashboard.py
. Primeiro, vamos importar os elementos que usaremos:
Python
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket
Agora, vamos configurar o logging para rastrear eventos e executar nossa aplicação em modo debug. Definimos o nível de logging como INFO
, o que significa que eventos com nível INFO
ou superior serão exibidos. Se você não estiver familiarizado com logging em Python, dê uma olhada na documentação oficial. 🤓
Python
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
Em seguida, vamos construir nosso processador de pacotes. Implementaremos a funcionalidade de processamento dos pacotes capturados nesta classe:
Python
class PacketProcessor:
"""Processa e analisa pacotes de rede"""
def __init__(self):
self.protocol_map = {
1: 'ICMP',
6: 'TCP',
17: 'UDP'
}
self.packet_data = []
self.start_time = datetime.now()
self.packet_count = 0
self.lock = threading.Lock()
def get_protocol_name(self, protocol_num: int) -> str:
"""Converte o número do protocolo para o nome"""
return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')
def process_packet(self, packet) -> None:
"""Processa um único pacote e extrai informações relevantes"""
try:
if IP in packet:
with self.lock:
packet_info = {
'timestamp': datetime.now(),
'source': packet[IP].src,
'destination': packet[IP].dst,
'protocol': self.get_protocol_name(packet[IP].proto),
'size': len(packet),
'time_relative': (datetime.now() - self.start_time).total_seconds()
}
# Adiciona informações específicas do TCP
if TCP in packet:
packet_info.update({
'src_port': packet[TCP].sport,
'dst_port': packet[TCP].dport,
'tcp_flags': packet[TCP].flags
})
# Adiciona informações específicas do UDP
elif UDP in packet:
packet_info.update({
'src_port': packet[UDP].sport,
'dst_port': packet[UDP].dport
})
self.packet_data.append(packet_info)
self.packet_count += 1
# Mantém apenas os últimos 10000 pacotes para evitar problemas de memória
if len(self.packet_data) > 10000:
self.packet_data.pop(0)
except Exception as e:
logger.error(f"Erro ao processar pacote: {str(e)}")
def get_dataframe(self) -> pd.DataFrame:
"""Converte os dados do pacote para um DataFrame do pandas"""
with self.lock:
return pd.DataFrame(self.packet_data)
Essa classe é o coração da nossa aplicação e possui funções utilitárias para processar os pacotes. Os pacotes de rede são categorizados em dois tipos no nível de transporte (TCP e UDP) e no protocolo ICMP no nível de rede. Se você não conhece os conceitos de TCP/IP, recomendo dar uma olhada neste artigo no freeCodeCamp News. 😉
Nosso construtor rastreia todos os pacotes categorizados nesses tipos de protocolo TCP/IP. Também anotamos o tempo de captura, os dados capturados e o número de pacotes. Usamos um bloqueio de thread para garantir que apenas um pacote seja processado por vez. Isso pode ser estendido para permitir o processamento paralelo de pacotes. 🤯
A função get_protocol_name
nos ajuda a obter o tipo correto de protocolo com base em seus números. A IANA (Internet Assigned Numbers Authority) atribui números padronizados para identificar diferentes protocolos em um pacote de rede. Ao encontrarmos esses números no pacote, saberemos qual tipo de protocolo está sendo usado. Para este projeto, mapearemos apenas TCP, UDP e ICMP (Ping). Se encontrarmos outro tipo de pacote, o classificaremos como OTHER(<protocol_num>)
. 🤔
A função process_packet
lida com a funcionalidade principal que processa os pacotes individuais. Se o pacote contiver uma camada IP, ele registrará os endereços IP de origem e destino, o tipo de protocolo, o tamanho do pacote e o tempo decorrido desde o início da captura. Para pacotes com protocolos de camada de transporte específicos (como TCP e UDP), capturaremos as portas de origem e destino, juntamente com as flags TCP para pacotes TCP. Esses detalhes serão armazenados na lista packet_data
. Também manteremos o controle do packet_count
à medida que os pacotes são processados. 🤓
A função get_dataframe
nos ajuda a converter a lista packet_data
em um DataFrame do Pandas, que será usado para nossa visualização. 📊
Como Criar as Visualizações no Streamlit (a hora do show)
Agora é hora de construir nosso Dashboard interativo no Streamlit. Definiremos uma função chamada create_visualization
no script dashboard.py
(fora da classe de processamento de pacotes):
Python
def create_visualizations(df: pd.DataFrame):
"""Cria todas as visualizações do dashboard"""
if len(df) > 0:
# Distribuição de protocolos
protocol_counts = df['protocol'].value_counts()
fig_protocol = px.pie(
values=protocol_counts.values,
names=protocol_counts.index,
title="Distribuição de Protocolos"
)
st.plotly_chart(fig_protocol, use_container_width=True)
# Linha do tempo dos pacotes
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
fig_timeline = px.line(
x=df_grouped.index,
y=df_grouped.values,
title="Pacotes por Segundo"
)
st.plotly_chart(fig_timeline, use_container_width=True)
# Principais IPs de origem
top_sources = df['source'].value_counts().head(10)
fig_sources = px.bar(
x=top_sources.index,
y=top_sources.values,
title="Principais Endereços IP de Origem"
)
st.plotly_chart(fig_sources, use_container_width=True)
Essa função recebe o DataFrame como entrada e plota três gráficos:
- Gráfico de Distribuição de Protocolos: Mostra a proporção de diferentes protocolos (TCP, UDP, ICMP) no tráfego capturado.
- Gráfico de Linha do Tempo dos Pacotes: Mostra o número de pacotes processados por segundo ao longo do tempo.
- Gráfico dos Principais Endereços IP de Origem: Destaca os 10 principais endereços IP que enviaram mais pacotes.
O gráfico de distribuição de protocolos é um gráfico de pizza com as contagens de protocolo para os três tipos (junto com OTHER
). Usamos as ferramentas Streamlit e Plotly para plotar esses gráficos. Como também anotamos o timestamp desde o início da captura, usaremos esses dados para plotar a tendência dos pacotes capturados ao longo do tempo. 📈
Para o segundo gráfico, faremos uma operação groupby
nos dados e obteremos o número de pacotes capturados em cada segundo (S
significa segundos) e, finalmente, plotaremos o gráfico. 📊
Para o terceiro gráfico, contaremos os IPs de origem distintos observados e plotaremos um gráfico das contagens de IP para mostrar os 10 principais IPs. 🗺️
Como Capturar os Pacotes de Rede (abrindo o portal para o mundo dos dados)
Agora, vamos construir a funcionalidade para capturar os dados dos pacotes de rede:
Python
def start_packet_capture():
"""Inicia a captura de pacotes em uma thread separada"""
processor = PacketProcessor()
def capture_packets():
sniff(prn=processor.process_packet, store=False)
capture_thread = threading.Thread(target=capture_packets, daemon=True)
capture_thread.start()
return processor
Esta função instancia a classe PacketProcessor
e usa a função sniff
do módulo scapy para iniciar a captura dos pacotes. Usamos threads para permitir a captura de pacotes independentemente do fluxo principal do programa. Isso garante que a captura não bloqueie outras operações, como a atualização do dashboard em tempo real. Também retornamos a instância PacketProcessor
criada para que possa ser usada no programa principal. 🔄
Juntando Tudo (a grande finalização)
Agora vamos juntar todas as peças com a função main
, que atuará como a função principal do nosso programa:
Python
def main():
"""Função principal para executar o dashboard"""
st.set_page_config(page_title="Análise de Tráfego de Rede", layout="wide")
st.title("Análise de Tráfego de Rede em Tempo Real")
# Inicializa o processador de pacotes no estado da sessão
if 'processor' not in st.session_state:
st.session_state.processor = start_packet_capture()
st.session_state.start_time = time.time()
# Cria o layout do dashboard
col1, col2 = st.columns(2)
# Obtém os dados atuais
df = st.session_state.processor.get_dataframe()
# Exibe as métricas
with col1:
st.metric("Total de Pacotes", len(df))
with col2:
duration = time.time() - st.session_state.start_time
st.metric("Duração da Captura", f"{duration:.2f}s")
# Exibe as visualizações
create_visualizations(df)
# Exibe os pacotes recentes
st.subheader("Pacotes Recentes")
if len(df) > 0:
st.dataframe(
df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
use_container_width=True
)
# Adiciona botão de atualização
if st.button('Atualizar Dados'):
st.rerun()
# Atualização automática
time.sleep(2)
st.rerun()
Essa função instancia o dashboard do Streamlit e integra todos os nossos componentes. Primeiro, definimos o título da página do dashboard e inicializamos o PacketProcessor
. Usamos o estado da sessão no Streamlit para garantir que apenas uma instância de captura de pacotes seja criada e que seu estado seja mantido. 🖥️
Obtemos o DataFrame do estado da sessão sempre que os dados são processados e começamos a exibir as métricas e as visualizações. Também exibimos os pacotes capturados recentemente, juntamente com informações como timestamp, IPs de origem e destino, protocolo e tamanho do pacote. Adicionamos a capacidade de o usuário atualizar manualmente os dados do dashboard, enquanto também o atualizamos automaticamente a cada dois segundos. 🔄
Finalmente, execute o programa com o seguinte comando:
Bash
sudo streamlit run dashboard.py
Observação: Você precisará executar o programa com sudo
porque a captura de pacotes requer privilégios administrativos. Se você estiver no Windows, abra o terminal como Administrador e execute o programa sem o prefixo sudo
. ⚠️
Dê um tempo para o programa começar a capturar pacotes. Se tudo correr bem, você verá algo como isto:
Essas são todas as visualizações que implementamos no nosso dashboard do Streamlit. 🎉
Melhorias Futuras (para os mais aventureiros)
Aqui estão algumas ideias para aprimorar o dashboard:
- Adicionar recursos de machine learning para detecção de anomalias (que tal um sistema que prevê ataques hackers?). 🤖
- Implementar mapeamento geográfico de IPs (para visualizar de onde vêm os pacotes, tipo um mapa de calor global). 🌎
- Criar alertas personalizados com base em padrões de tráfego (para ser avisado quando algo suspeito estiver acontecendo). 🚨
- Adicionar opções de análise de payload de pacotes (para mergulhar ainda mais fundo nos dados). 🔬
Conclusão (ufa, chegamos ao fim!)
Parabéns! Você construiu com sucesso um dashboard de análise de tráfego de rede em tempo real com Python e Streamlit. Este programa fornecerá insights valiosos sobre o comportamento da rede e pode ser estendido para vários casos de uso, desde monitoramento de segurança até otimização de rede. 🚀
Espero que você tenha aprendido o básico sobre análise de tráfego de rede e um pouco de programação em Python. Obrigado por ler! 😊
Quer saber mais sobre STREAMLIT?
Compre meu Livro Disponível na Amazon: https://www.amazon.com.br/dp/B0CW1BH7NL