In [80]:
import json
import os
import requests
import urllib3
from typing import Dict, Optional, List
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from urllib3.exceptions import InsecureRequestWarning

urllib3.disable_warnings(InsecureRequestWarning)

MispGalaxyUpdater¶

In [85]:
class MispGalaxyUpdater:
    def __init__(self, misp_url: str, misp_api_key: str, verify_ssl: bool = True):
        self.misp_url = misp_url.rstrip("/")
        self.api_key = misp_api_key
        self.verify_ssl = verify_ssl

    def update_galaxy_file(self, galaxy_data: Dict, galaxy_dir: str):
        os.makedirs(galaxy_dir, exist_ok=True)
        path = os.path.join(galaxy_dir, f"{galaxy_data['type']}.json")
        with open(path, "w", encoding="utf-8") as f:
            json.dump(galaxy_data, f, indent=2)
        print(f"Galaxy saved to: {path}")

    def update_cluster_file(self, cluster_data: Dict, cluster_dir: str):
        os.makedirs(cluster_dir, exist_ok=True)
        path = os.path.join(cluster_dir, f"{cluster_data['type']}.json")
        with open(path, "w", encoding="utf-8") as f:
            json.dump(cluster_data, f, indent=2)
        print(f"Cluster saved to: {path}")

    def force_update_galaxies(self):
        endpoint = f"{self.misp_url}/galaxies/update"
        headers = {
            "Authorization": self.api_key,
            "Content-Type": "application/json",
            "Accept": "application/json",
        }

        response = requests.post(
            endpoint,
            headers=headers,
            verify=self.verify_ssl,
        )
        if response.status_code == 200:
            print("Force update galaxies successfully.")
        else:
            print(f"Failed to force update galaxies. Status: {response.status_code}, Message: {response.text}")

MispFeedClient¶

In [83]:
class MispFeedClient:
    def __init__(self, fqdn: str, api_token: str):
        self.base_url = fqdn
        self.api_token = api_token
        self.session = self._create_session()
        self._galaxy_api_path = "/v3.0/threatintel/misp/galaxies"
        self._cluster_api_path = "/v3.0/threatintel/misp/clusters"
    
    def _create_session(self) -> requests.Session:
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET"]
        )
        session = requests.Session()
        session.headers.update({
            'Authorization': f'Bearer {self.api_token}'
        })
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("https://", adapter)
        return session
    
    def fetch_galaxy(self, galaxy_name: str) -> Dict:
        url = f"{self.base_url}/{self._galaxy_api_path}/{galaxy_name}.json"
        try:
            response = self.session.get(url)
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"Failed to fetch galaxy, error: {e}")
            return {}

    def fetch_cluster(self, cluster_name: str) -> Dict:
        url = f"{self.base_url}/{self._cluster_api_path}/{cluster_name}.json"
        try:
            response = self.session.get(url)
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"Failed to fetch cluster, error: {e}")
            return {}

Get Started¶

Configuring Custom Galaxies and Clusters¶

Add Galaxy via Galaxy File¶

Create galaxy json file, and put it in your MISP galaxies directory.

In [ ]:
v1_url = "VISION_ONE_URL"
v1_api_token = "YOUR_V1_API_TOKEN"
misp_feed_client = MispFeedClient(v1_url, v1_api_token)

misp_url = "YOUR_MISP_URL"
misp_key = "YOUR_MISP_KEY"
misp_updater = MispGalaxyUpdater(misp_url, misp_key)

# Create galaxy
galaxy_names = [
    "trendmicro-campaign-galaxy",
    "trendmicro-intrusion-set-galaxy",
    "trendmicro-malware-galaxy",
    "trendmicro-tool-galaxy",
    "trendmicro-vulnerability-galaxy"
]
misp_galaxy_dir = "/var/www/MISP/app/files/misp-galaxy/galaxies"
for galaxy_name in galaxy_names:
    galaxy = misp_feed_client.fetch_galaxy(galaxy_name)
    if galaxy:
        misp_updater.update_galaxy_file(galaxy, misp_galaxy_dir)

# Update galaxies after creating galaxy and cluster file
misp_updater.force_update_galaxies()

Add Cluster via Cluster File¶

In [ ]:
v1_url = "VISION_ONE_URL"
v1_api_token = "YOUR_V1_API_TOKEN"
misp_feed_client = MispFeedClient(v1_url, v1_api_token)

misp_url = "YOUR_MISP_URL"
misp_key = "YOUR_MISP_KEY"
misp_updater = MispGalaxyUpdater(misp_url, misp_key)

# Create cluster
cluster_names = [
    "trendmicro-campaign",
    "trendmicro-intrusion-set",
    "trendmicro-malware",
    "trendmicro-tool",
    "trendmicro-vulnerability"
]
misp_cluster_dir = "/var/www/MISP/app/files/misp-galaxy/clusters"
for cluster_name in cluster_names:
    cluster = misp_feed_client.fetch_cluster(cluster_name)
    if cluster:
        misp_updater.update_cluster_file(cluster, misp_cluster_dir)

# Update galaxies after creating galaxy and cluster file
misp_updater.force_update_galaxies()