Neste post irei falar sobre a ferramenta click_watcher que desenvolvi para servir como um filebeat, porém que envia os logs via websocket. O script funciona apenas para logs no formato JSON.
Para você que não quer perder tempo ou que ja leu o post, esse aqui é o repositório do ClickSiem Esse aqui é o click_watcher Esse aqui é o click_server.
Para não apenas jogar o código no github, decidi criar esse post para mostrar como foi desenvolvida a ferramenta.
Proof
Prova de conceito do click_watcher.
Criação do ambiente
vamos utilizar uv para gerenciar o projeto python.
- Inicializar o projeto
uv init- Adicionar dependencias
uv add watchdog websockets- Iniciar o virtual environment
source .venv/bin/activateMonitorando edição dos logs
Para monitorar qualquer nova atualização e ou modificação do arquivo de alertas, iremos utilizar a biblioteca watchdog.
Watchdog funciona em multiplas plataformas, não apenas linux
A biblioteca watchdog utiliza no linux o inotify para monitorar qualquer mudança evento de filesystem em um diretório.
Vamos para um exemplo antes de trazer o código final.
Código para identificar alterações no arquivo
from watchdog.observers import Observerfrom watchdog.events import FileSystemEventHandlerimport timeimport os
file = './test.log'
class LogWatcher(FileSystemEventHandler): def on_modified(self, event): global file if os.path.abspath(event.src_path) == os.path.abspath(file): print('arquivo modificado')
watcher = LogWatcher()observer = Observer()observer.schedule(watcher, os.path.dirname(file) or '.', recursive=False)observer.start() # inicia thread em backgroundwhile True: time.sleep(1)Com esse código, conseguimos identificar eventos de mudanças no arquivo, porém ainda não conseguimos ver o conteúdo que foi alterado, então, como iremos fazer?
Para monitorar também alterações no arquivo, nós iremos:
- Abrir o arquivo
- Mudar a nossa posição do arquivo para o final
- Para cada evento de
on_modified, realizar a leitura e atualizar a nossa posição no arquivo.
...from io import SEEK_END, SEEK_SET
class LogWatcher(FileSystemEventHandler): def __init__(self): self.fd = open(file, 'r') # abre o arquivo self.fd.seek(0, SEEK_END) # muda a posição para o final
def on_modified(self, event): ... for log in self.fd: # lê novas entradas do arquivo e atualiza a posição log = log.strip() self.process_log(log) def process_log(self, log): ...Veja que tem algumas variaveis que não foram utilizadas ainda, iremos utilizar nos proximos posts da série.
Depois disso você pode ter se perdido, mas calma que eu vou te explicar.
Processamento em lote
Fizemos 2 formas de processar os logs:
- Processar os logs a cada 1 minuto
- Processar os logs se o buffer ultrapassar
max_batch
Criamos uma thread separada para esperar 1 segundo antes de chamar o flush_logs
Fizemos uma condição para chamar o flush caso a quantidade de logs no buffer seja superior a max_batch
if len(self.buffer) >= self.max_batch: flush = TrueIdentificar rotate do arquivo
Além disso, adicionamos formas de evitar problemas caso o arquivo seja deletado e perdermos o descritor do arquivo.
def on_deleted(self, event): ... self._close_file() # fecha o descritor do arquivodef on_created(self, event): ... self.seek_end() # acessa novamente o arquivo self.file.seek(0, SEEK_SET) # posiciona o arquivo para o inicio para não perder logs do novo arquivoTestando o projeto file watcher
Para testar o projeto, vamos utilizar o click_server.
O click_server é um servidor NodeJS que implementa um servidor websocket usando o framework NestJS!
Vamos testar o ambiente simulando como seria com o Wazuh.
Servidor
Inicialize o servidor conforme a documentação.
npm run start
Click Watcher
Inicialize o click_watcher conforme a documentação.
uv run main.py -u 'ws://localhost:3000/' -f ./alerts.json
Test
Rode o comando para gerar alertas aleatórios depois de iniciar o servidor e o watcher.
uv run test/random_wazuh_events.py 5 -o alerts.json
Wazuh?
Você ja consegue utilizar com o wazuh, apenas faça:
uv run main.py -u 'ws://localhost:3000/' -f /var/ossec/logs/alerts/alerts.jsonProximos passos
No proximo post irei desenvolver e publicar o servidor completo, realizando o envio dos alertas dinamicamente para o ClickHouse e ja funcionando com os Webhooks
Server - Proximo Post
- Enviar alertas para o clickhouse criando dinamicamente os campos
- Criar lógica do webhook
- Criar endpoints
Scheduler
- Criar base do scheduler
- Conectar no ClickHouse
- Criar lógica de funcionamento
- Enviar os alertas para o click_server