In [80]:
import json
import os
import requests
import urllib3
from typing import Dict, Optional, List
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)
MispGalaxyUpdater¶
In [85]:
class MispGalaxyUpdater:
def __init__(self, misp_url: str, misp_api_key: str, verify_ssl: bool = True):
self.misp_url = misp_url.rstrip("/")
self.api_key = misp_api_key
self.verify_ssl = verify_ssl
def update_galaxy_file(self, galaxy_data: Dict, galaxy_dir: str):
os.makedirs(galaxy_dir, exist_ok=True)
path = os.path.join(galaxy_dir, f"{galaxy_data['type']}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(galaxy_data, f, indent=2)
print(f"Galaxy saved to: {path}")
def update_cluster_file(self, cluster_data: Dict, cluster_dir: str):
os.makedirs(cluster_dir, exist_ok=True)
path = os.path.join(cluster_dir, f"{cluster_data['type']}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(cluster_data, f, indent=2)
print(f"Cluster saved to: {path}")
def force_update_galaxies(self):
endpoint = f"{self.misp_url}/galaxies/update"
headers = {
"Authorization": self.api_key,
"Content-Type": "application/json",
"Accept": "application/json",
}
response = requests.post(
endpoint,
headers=headers,
verify=self.verify_ssl,
)
if response.status_code == 200:
print("Force update galaxies successfully.")
else:
print(f"Failed to force update galaxies. Status: {response.status_code}, Message: {response.text}")
MispFeedClient¶
In [83]:
class MispFeedClient:
def __init__(self, fqdn: str, api_token: str):
self.base_url = fqdn
self.api_token = api_token
self.session = self._create_session()
self._galaxy_api_path = "/v3.0/threatintel/misp/galaxies"
self._cluster_api_path = "/v3.0/threatintel/misp/clusters"
def _create_session(self) -> requests.Session:
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"]
)
session = requests.Session()
session.headers.update({
'Authorization': f'Bearer {self.api_token}'
})
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
return session
def fetch_galaxy(self, galaxy_name: str) -> Dict:
url = f"{self.base_url}/{self._galaxy_api_path}/{galaxy_name}.json"
try:
response = self.session.get(url)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Failed to fetch galaxy, error: {e}")
return {}
def fetch_cluster(self, cluster_name: str) -> Dict:
url = f"{self.base_url}/{self._cluster_api_path}/{cluster_name}.json"
try:
response = self.session.get(url)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Failed to fetch cluster, error: {e}")
return {}
Get Started¶
Configuring Custom Galaxies and Clusters¶
Add Galaxy via Galaxy File¶
Create galaxy json file, and put it in your MISP galaxies directory.
In [ ]:
v1_url = "VISION_ONE_URL"
v1_api_token = "YOUR_V1_API_TOKEN"
misp_feed_client = MispFeedClient(v1_url, v1_api_token)
misp_url = "YOUR_MISP_URL"
misp_key = "YOUR_MISP_KEY"
misp_updater = MispGalaxyUpdater(misp_url, misp_key)
# Create galaxy
galaxy_names = [
"trendmicro-campaign-galaxy",
"trendmicro-intrusion-set-galaxy",
"trendmicro-malware-galaxy",
"trendmicro-tool-galaxy",
"trendmicro-vulnerability-galaxy"
]
misp_galaxy_dir = "/var/www/MISP/app/files/misp-galaxy/galaxies"
for galaxy_name in galaxy_names:
galaxy = misp_feed_client.fetch_galaxy(galaxy_name)
if galaxy:
misp_updater.update_galaxy_file(galaxy, misp_galaxy_dir)
# Update galaxies after creating galaxy and cluster file
misp_updater.force_update_galaxies()
Add Cluster via Cluster File¶
In [ ]:
v1_url = "VISION_ONE_URL"
v1_api_token = "YOUR_V1_API_TOKEN"
misp_feed_client = MispFeedClient(v1_url, v1_api_token)
misp_url = "YOUR_MISP_URL"
misp_key = "YOUR_MISP_KEY"
misp_updater = MispGalaxyUpdater(misp_url, misp_key)
# Create cluster
cluster_names = [
"trendmicro-campaign",
"trendmicro-intrusion-set",
"trendmicro-malware",
"trendmicro-tool",
"trendmicro-vulnerability"
]
misp_cluster_dir = "/var/www/MISP/app/files/misp-galaxy/clusters"
for cluster_name in cluster_names:
cluster = misp_feed_client.fetch_cluster(cluster_name)
if cluster:
misp_updater.update_cluster_file(cluster, misp_cluster_dir)
# Update galaxies after creating galaxy and cluster file
misp_updater.force_update_galaxies()