189 lines
5.2 KiB
Python
Executable File
189 lines
5.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import requests
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
import os
|
|
import otc_metadata.services
|
|
import argparse
|
|
import logging
|
|
|
|
# ===== Configuration =====
|
|
USERNAME = os.getenv("UMAMI_USERNAME")
|
|
PASSWORD = os.getenv("UMAMI_PASSWORD")
|
|
OUTPUT_FILE = "stats.json"
|
|
|
|
# ===== Data =====
|
|
blacklist = [
|
|
"ed",
|
|
"sd"
|
|
]
|
|
|
|
# ===== Logger =====
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def parse_args():
|
|
"""
|
|
Command-line arguments
|
|
"""
|
|
parser = argparse.ArgumentParser(description="Analytics Script")
|
|
parser.add_argument(
|
|
"--base-url",
|
|
default="https://analytics.otc-service.com",
|
|
help="Base_Url of analytics server"
|
|
)
|
|
parser.add_argument(
|
|
"--cloud-environment",
|
|
default="eu_de",
|
|
choices=['eu_de', 'swiss'],
|
|
help="Cloud Environments (default: eu_de)"
|
|
)
|
|
parser.add_argument(
|
|
"--environment",
|
|
default=['public'],
|
|
nargs='+',
|
|
choices=['public', 'internal', 'hidden'],
|
|
help="Environments (default: ['public'])"
|
|
)
|
|
parser.add_argument(
|
|
"--limit",
|
|
type=int,
|
|
default=10,
|
|
help="Result count"
|
|
)
|
|
parser.add_argument(
|
|
"--website-id",
|
|
required=True,
|
|
help="Umami Website ID"
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def get_umami_token(base_url):
|
|
"""Get Bearer-Token from Umami-API."""
|
|
url = f"{base_url}/api/auth/login"
|
|
response = requests.post(url, json={"username": USERNAME, "password": PASSWORD})
|
|
response.raise_for_status()
|
|
return response.json().get("token")
|
|
|
|
|
|
def get_4_weeks_range():
|
|
"""Calculates start and end of 4 weeks range in UNIX timestamp format."""
|
|
end_date = datetime.utcnow()
|
|
start_date = end_date - timedelta(weeks=4)
|
|
|
|
start_ts = int(start_date.timestamp() * 1000)
|
|
end_ts = int(end_date.timestamp() * 1000)
|
|
return start_ts, end_ts
|
|
|
|
|
|
def fetch_pageviews(token, start_ts, end_ts, website_id, base_url):
|
|
"""Retrieves statistics from API server."""
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
url = f"{base_url}/api/websites/{website_id}/metrics"
|
|
params = {
|
|
"type": "url",
|
|
"startAt": start_ts,
|
|
"endAt": end_ts
|
|
}
|
|
|
|
response = requests.get(url, headers=headers, params=params)
|
|
response.raise_for_status()
|
|
return response
|
|
|
|
|
|
def filter_unique_service_types(stats, cloud_environment, environment, limit):
|
|
"""
|
|
Filter stats and return unique service_type values that exist in metadata,
|
|
skip blacklisted service types, and limit results to `limit` entries.
|
|
"""
|
|
services = otc_metadata.services.Services().all_services_by_cloud_environment(
|
|
cloud_environment=cloud_environment,
|
|
environments=environment
|
|
)
|
|
|
|
# Map service_uri -> service_type
|
|
uri_to_type = {s["service_uri"]: s["service_type"] for s in services}
|
|
|
|
seen_types = set()
|
|
filtered = []
|
|
|
|
for entry in stats.json():
|
|
url_path = entry["x"]
|
|
|
|
for service_uri, service_type in uri_to_type.items():
|
|
if f"/{service_uri}" in url_path and service_type not in seen_types:
|
|
if service_type in blacklist:
|
|
continue
|
|
|
|
filtered.append(service_type)
|
|
seen_types.add(service_type)
|
|
|
|
if len(filtered) >= limit:
|
|
return filtered
|
|
break
|
|
|
|
return filtered
|
|
|
|
|
|
def save_to_file(data, environment, cloud_environment):
|
|
"""
|
|
Saves data in the folder ./analytics/<environment>/<cloud_environment>.json
|
|
"""
|
|
folder = os.path.join("otc_metadata", "analytics", environment)
|
|
os.makedirs(folder, exist_ok=True)
|
|
|
|
filename = os.path.join(folder, f"{cloud_environment}.json")
|
|
abs_path = os.path.abspath(filename)
|
|
|
|
with open(filename, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
logger.info(f"✅ Data saved in: {abs_path}")
|
|
|
|
|
|
def main():
|
|
try:
|
|
args = parse_args()
|
|
token = get_umami_token(base_url=args.base_url)
|
|
start_ts, end_ts = get_4_weeks_range()
|
|
stats = fetch_pageviews(token, start_ts, end_ts, website_id=args.website_id, base_url=args.base_url)
|
|
filtered_stats = filter_unique_service_types(
|
|
stats=stats,
|
|
cloud_environment=args.cloud_environment,
|
|
environment=args.environment,
|
|
limit=args.limit
|
|
)
|
|
save_to_file(
|
|
data=filtered_stats,
|
|
environment=args.environment[0],
|
|
cloud_environment=args.cloud_environment
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
raise
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|