598 words
3 minutes
Wazuh 4.x como um produto / parte 2 - Filebeat -> click_watcher

Neste post irei falar sobre a ferramenta click_watcher que desenvolvi para servir como um filebeat, porém que envia os logs via websocket. O script funciona apenas para logs no formato JSON.

Para você que não quer perder tempo ou que ja leu o post, esse aqui é o repositório do ClickSiem Esse aqui é o click_watcher Esse aqui é o click_server.

Para não apenas jogar o código no github, decidi criar esse post para mostrar como foi desenvolvida a ferramenta.

Proof#

Prova de conceito do click_watcher.

Criação do ambiente#

vamos utilizar uv para gerenciar o projeto python.

  1. Inicializar o projeto
Terminal window
uv init
  1. Adicionar dependencias
Terminal window
uv add watchdog websockets
  1. Iniciar o virtual environment
Terminal window
source .venv/bin/activate

Monitorando edição dos logs#

Para monitorar qualquer nova atualização e ou modificação do arquivo de alertas, iremos utilizar a biblioteca watchdog.

Watchdog funciona em multiplas plataformas, não apenas linux

A biblioteca watchdog utiliza no linux o inotify para monitorar qualquer mudança evento de filesystem em um diretório.

Vamos para um exemplo antes de trazer o código final.

Código para identificar alterações no arquivo

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
import os
file = './test.log'
class LogWatcher(FileSystemEventHandler):
def on_modified(self, event):
global file
if os.path.abspath(event.src_path) == os.path.abspath(file):
print('arquivo modificado')
watcher = LogWatcher()
observer = Observer()
observer.schedule(watcher, os.path.dirname(file) or '.', recursive=False)
observer.start() # inicia thread em background
while True:
time.sleep(1)

Com esse código, conseguimos identificar eventos de mudanças no arquivo, porém ainda não conseguimos ver o conteúdo que foi alterado, então, como iremos fazer?

Para monitorar também alterações no arquivo, nós iremos:

  1. Abrir o arquivo
  2. Mudar a nossa posição do arquivo para o final
  3. Para cada evento de on_modified, realizar a leitura e atualizar a nossa posição no arquivo.
...
from io import SEEK_END, SEEK_SET
class LogWatcher(FileSystemEventHandler):
def __init__(self):
self.fd = open(file, 'r') # abre o arquivo
self.fd.seek(0, SEEK_END) # muda a posição para o final
def on_modified(self, event):
...
for log in self.fd: # lê novas entradas do arquivo e atualiza a posição
log = log.strip()
self.process_log(log)
def process_log(self, log):
...

Veja que tem algumas variaveis que não foram utilizadas ainda, iremos utilizar nos proximos posts da série.

Depois disso você pode ter se perdido, mas calma que eu vou te explicar.

Processamento em lote#

Fizemos 2 formas de processar os logs:

  1. Processar os logs a cada 1 minuto
  2. Processar os logs se o buffer ultrapassar max_batch

Criamos uma thread separada para esperar 1 segundo antes de chamar o flush_logs

Fizemos uma condição para chamar o flush caso a quantidade de logs no buffer seja superior a max_batch

if len(self.buffer) >= self.max_batch:
flush = True

Identificar rotate do arquivo#

Além disso, adicionamos formas de evitar problemas caso o arquivo seja deletado e perdermos o descritor do arquivo.

def on_deleted(self, event):
...
self._close_file() # fecha o descritor do arquivo
def on_created(self, event):
...
self.seek_end() # acessa novamente o arquivo
self.file.seek(0, SEEK_SET) # posiciona o arquivo para o inicio para não perder logs do novo arquivo

Testando o projeto file watcher#

Para testar o projeto, vamos utilizar o click_server.

O click_server é um servidor NodeJS que implementa um servidor websocket usando o framework NestJS!

Vamos testar o ambiente simulando como seria com o Wazuh.

Servidor#

Inicialize o servidor conforme a documentação.

Terminal window
npm run start

Server

Click Watcher#

Inicialize o click_watcher conforme a documentação.

Terminal window
uv run main.py -u 'ws://localhost:3000/' -f ./alerts.json

Watcher

Test#

Rode o comando para gerar alertas aleatórios depois de iniciar o servidor e o watcher.

Terminal window
uv run test/random_wazuh_events.py 5 -o alerts.json

test

Wazuh?#

Você ja consegue utilizar com o wazuh, apenas faça:

Terminal window
uv run main.py -u 'ws://localhost:3000/' -f /var/ossec/logs/alerts/alerts.json

Proximos passos#

No proximo post irei desenvolver e publicar o servidor completo, realizando o envio dos alertas dinamicamente para o ClickHouse e ja funcionando com os Webhooks

Server - Proximo Post#

  1. Enviar alertas para o clickhouse criando dinamicamente os campos
  2. Criar lógica do webhook
  3. Criar endpoints

Scheduler#

  1. Criar base do scheduler
  2. Conectar no ClickHouse
  3. Criar lógica de funcionamento
  4. Enviar os alertas para o click_server
Wazuh 4.x como um produto / parte 2 - Filebeat -> click_watcher
https://blog.souzo.me/posts/wazuh-as-a-product-part-2/
Author
souzo
Published at
2025-11-28
License
CC BY-NC-SA 4.0