From 0cb92eab9cc36aa43df72605ce901c29945301f6 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Tue, 20 May 2025 14:45:19 +0330 Subject: [PATCH 1/4] feat: Added s3 support for mediaWiki ETL! --- .env.example | 9 +- .gitignore | 5 +- docker-compose.dev.yml | 41 ++++++++ hivemind_etl/mediawiki/activities.py | 65 +++++++++--- hivemind_etl/mediawiki/workflows.py | 11 +- hivemind_etl/storage/s3_client.py | 145 +++++++++++++++++++++++++++ requirements.txt | 2 + 7 files changed, 259 insertions(+), 19 deletions(-) create mode 100644 hivemind_etl/storage/s3_client.py diff --git a/.env.example b/.env.example index 1a22b5a..83dc131 100644 --- a/.env.example +++ b/.env.example @@ -28,4 +28,11 @@ POSTGRES_PWD= POSTGRES_SEEDS= POSTGRES_USER= -PROXY_URL= \ No newline at end of file +PROXY_URL= + +AWS_ENDPOINT_URL= +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= +AWS_S3_BUCKET= +AWS_REGION= +AWS_SECURE= \ No newline at end of file diff --git a/.gitignore b/.gitignore index cce1800..397cafc 100644 --- a/.gitignore +++ b/.gitignore @@ -165,4 +165,7 @@ main.ipynb *.xml -dump_* \ No newline at end of file +dump_* + +minio_data/ +dumps/* \ No newline at end of file diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 1f5751a..a884d88 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -13,8 +13,17 @@ services: condition: service_healthy redis: condition: service_healthy + minio: + condition: service_healthy networks: - temporal-network + environment: + - AWS_ENDPOINT_URL=http://minio:9000 + - AWS_ACCESS_KEY_ID=minioadmin + - AWS_SECRET_ACCESS_KEY=minioadmin + - AWS_S3_BUCKET=hivemind-etl + - AWS_REGION=us-east-1 + - AWS_SECURE=false temporal: image: temporalio/auto-setup:1.25.2.0 @@ -120,6 +129,38 @@ services: networks: - temporal-network + minio: + image: minio/minio:RELEASE.2025-04-22T22-12-26Z + ports: + - "9000:9000" # API + - "9001:9001" # Console + environment: + MINIO_ROOT_USER: ${AWS_ACCESS_KEY_ID:-minioadmin} + MINIO_ROOT_PASSWORD: ${AWS_SECRET_ACCESS_KEY:-minioadmin} + volumes: + - ./minio_data:/data + command: server /data --console-address ":9001" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 30s + timeout: 20s + retries: 3 + networks: + - temporal-network + + minio-healthcheck: + image: curlimages/curl:8.11.0 + entrypoint: ["/bin/sh", "-c", "--", "while true; do sleep 30; done;"] + depends_on: + - minio + healthcheck: + test: ["CMD", "curl", "-f", "http://minio:9000/minio/health/live"] + interval: 10s + timeout: 2s + retries: 5 + networks: + - temporal-network + networks: temporal-network: driver: bridge diff --git a/hivemind_etl/mediawiki/activities.py b/hivemind_etl/mediawiki/activities.py index 64f5060..cdf0e56 100644 --- a/hivemind_etl/mediawiki/activities.py +++ b/hivemind_etl/mediawiki/activities.py @@ -6,6 +6,7 @@ with workflow.unsafe.imports_passed_through(): from hivemind_etl.mediawiki.module import ModulesMediaWiki from hivemind_etl.mediawiki.etl import MediawikiETL + from hivemind_etl.storage.s3_client import S3Client from llama_index.core import Document @@ -53,7 +54,9 @@ async def get_hivemind_mediawiki_platforms( @activity.defn async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None: - """Extract data from MediaWiki API URL.""" + """ + Extract data from MediaWiki API URL and store in S3. + """ try: community_id = mediawiki_platform["community_id"] api_url = mediawiki_platform["base_url"] @@ -69,7 +72,8 @@ async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None: platform_id=platform_id, ) mediawiki_etl.extract(api_url=api_url) - logging.info(f"Completed extraction for community {community_id}") + + logging.info(f"Completed extraction for community {community_id}!") except Exception as e: community_id = mediawiki_platform["community_id"] logging.error(f"Error in extraction for community {community_id}: {str(e)}") @@ -79,9 +83,20 @@ async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None: @activity.defn async def transform_mediawiki_data( mediawiki_platform: dict[str, Any], -) -> list[Document]: - """Transform the extracted MediaWiki data.""" +) -> str: + """ + Transform the extracted MediaWiki data and store in S3. + Parameters + ---------- + mediawiki_platform : dict[str, Any] + The platform configuration + + Returns + ------- + str + The S3 key where the transformed data is stored + """ community_id = mediawiki_platform["community_id"] platform_id = mediawiki_platform["platform_id"] try: @@ -93,25 +108,51 @@ async def transform_mediawiki_data( namespaces=namespaces, platform_id=platform_id, ) - result = mediawiki_etl.transform() - logging.info(f"Completed transformation for community {community_id}") - return result + + # Transform data using the extracted data from S3 + documents = mediawiki_etl.transform() + + s3_client = S3Client() + # Store transformed data in S3 + transformed_key = s3_client.store_transformed_data(community_id, documents) + + logging.info( + f"Completed transformation for community {community_id} and stored in S3 with key: {transformed_key}" + ) + return transformed_key except Exception as e: logging.error(f"Error in transformation for community {community_id}: {str(e)}") raise @activity.defn -async def load_mediawiki_data(mediawiki_platform: dict[str, Any]) -> None: - """Load the transformed MediaWiki data into the database.""" +async def load_mediawiki_data( + mediawiki_platform: dict[str, Any], +) -> None: + """ + Load the transformed MediaWiki data into the database. + + Parameters + ---------- + mediawiki_platform : dict[str, Any] + The platform configuration + """ community_id = mediawiki_platform["community_id"] platform_id = mediawiki_platform["platform_id"] namespaces = mediawiki_platform["namespaces"] + transformed_data_key = mediawiki_platform["transformed_data_key"] try: - documents_dict = mediawiki_platform["documents"] - # temporal had converted them to dicts, so we need to convert them back to Document objects - documents = [Document.from_dict(doc) for doc in documents_dict] + # Get transformed data from S3 + s3_client = S3Client() + transformed_data = s3_client.get_data_by_key(transformed_data_key) + if not transformed_data: + raise ValueError( + f"No transformed data found in S3 for community {community_id}" + ) + + # Convert dict data back to Document objects + documents = [Document.from_dict(doc) for doc in transformed_data] logging.info(f"Starting data load for community {community_id}") mediawiki_etl = MediawikiETL( diff --git a/hivemind_etl/mediawiki/workflows.py b/hivemind_etl/mediawiki/workflows.py index 6d1ae94..c23856b 100644 --- a/hivemind_etl/mediawiki/workflows.py +++ b/hivemind_etl/mediawiki/workflows.py @@ -46,7 +46,8 @@ async def run(self, platform_id: str | None = None) -> None: "namespaces": platform["namespaces"], "platform_id": platform["platform_id"], } - # Extract data from MediaWiki + + # Extract data from MediaWiki and store in S3 await workflow.execute_activity( extract_mediawiki, mediawiki_platform, @@ -57,8 +58,8 @@ async def run(self, platform_id: str | None = None) -> None: ), ) - # Transform the extracted data - documents = await workflow.execute_activity( + # Transform the extracted data and store in S3 + transformed_data_key = await workflow.execute_activity( transform_mediawiki_data, mediawiki_platform, start_to_close_timeout=timedelta(hours=6), @@ -68,8 +69,8 @@ async def run(self, platform_id: str | None = None) -> None: ), ) - mediawiki_platform["documents"] = documents - # Load the transformed data + mediawiki_platform["transformed_data_key"] = transformed_data_key + # Load the transformed data from S3 await workflow.execute_activity( load_mediawiki_data, mediawiki_platform, diff --git a/hivemind_etl/storage/s3_client.py b/hivemind_etl/storage/s3_client.py new file mode 100644 index 0000000..3ffcdab --- /dev/null +++ b/hivemind_etl/storage/s3_client.py @@ -0,0 +1,145 @@ +import os +import json +import logging +from datetime import datetime, timezone +from typing import Any, Dict, List + +import boto3 +from botocore.config import Config +from botocore.exceptions import ClientError +from llama_index.core import Document + + +class S3Client: + def __init__(self): + # Get AWS S3 environment variables + self.endpoint_url = os.getenv("AWS_ENDPOINT_URL") + self.access_key = os.getenv("AWS_ACCESS_KEY_ID") + self.secret_key = os.getenv("AWS_SECRET_ACCESS_KEY") + self.bucket_name = os.getenv("AWS_S3_BUCKET") + self.region = os.getenv("AWS_REGION", "us-east-1") + self.secure = os.getenv("AWS_SECURE", "true").lower() == "true" + + # Check each required variable and log if missing + missing_vars = [] + if not self.endpoint_url: + missing_vars.append("AWS_ENDPOINT_URL") + if not self.access_key: + missing_vars.append("AWS_ACCESS_KEY_ID") + if not self.secret_key: + missing_vars.append("AWS_SECRET_ACCESS_KEY") + if not self.bucket_name: + missing_vars.append("AWS_S3_BUCKET") + + if missing_vars: + error_msg = ( + f"Missing required environment variables: {', '.join(missing_vars)}" + ) + logging.error(error_msg) + raise ValueError(error_msg) + + logging.info( + f"Initializing S3 client with endpoint: {self.endpoint_url}, " + f"bucket: {self.bucket_name}, region: {self.region}, secure: {self.secure}" + ) + + # Configure S3 client + config = Config( + signature_version="s3v4", + region_name=self.region, + ) + + self.s3_client = boto3.client( + "s3", + endpoint_url=self.endpoint_url, + aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, + config=config, + verify=self.secure, + ) + + # Ensure bucket exists + try: + self.s3_client.head_bucket(Bucket=self.bucket_name) + logging.info(f"Successfully connected to bucket: {self.bucket_name}") + except ClientError as e: + if e.response["Error"]["Code"] == "404": + logging.info(f"Creating bucket: {self.bucket_name}") + self.s3_client.create_bucket( + Bucket=self.bucket_name, + CreateBucketConfiguration={"LocationConstraint": self.region}, + ) + logging.info(f"Successfully created bucket: {self.bucket_name}") + else: + logging.error(f"Error accessing bucket {self.bucket_name}: {str(e)}") + raise + + def _get_key(self, community_id: str, activity_type: str, timestamp: str) -> str: + """Generate a unique S3 key for the data.""" + return f"{community_id}/{activity_type}/{timestamp}.json" + + def store_extracted_data(self, community_id: str, data: Dict[str, Any]) -> str: + """Store extracted data in S3.""" + timestamp = datetime.now(tz=timezone.utc).isoformat() + key = self._get_key(community_id, "extracted", timestamp) + + self.s3_client.put_object( + Bucket=self.bucket_name, + Key=key, + Body=json.dumps(data), + ContentType="application/json", + ) + return key + + def store_transformed_data( + self, community_id: str, documents: List[Document] + ) -> str: + """Store transformed documents in S3.""" + timestamp = datetime.now(tz=timezone.utc).isoformat() + key = self._get_key(community_id, "transformed", timestamp) + + # Convert Documents to dict for JSON serialization + docs_data = [doc.to_dict() for doc in documents] + + self.s3_client.put_object( + Bucket=self.bucket_name, + Key=key, + Body=json.dumps(docs_data), + ContentType="application/json", + ) + return key + + def get_data_by_key(self, key: str) -> Dict[str, Any]: + """Get data from S3 using a specific key.""" + try: + obj = self.s3_client.get_object(Bucket=self.bucket_name, Key=key) + return json.loads(obj["Body"].read().decode("utf-8")) + except ClientError as e: + if e.response["Error"]["Code"] == "NoSuchKey": + logging.error(f"No data found for key: {key}") + raise ValueError(f"No data found for key: {key}") + logging.error(f"Error retrieving data for key {key}: {str(e)}") + raise + + def get_latest_data(self, community_id: str, activity_type: str) -> Dict[str, Any]: + """Get the most recent data for a community and activity type.""" + prefix = f"{community_id}/{activity_type}/" + + try: + response = self.s3_client.list_objects_v2( + Bucket=self.bucket_name, + Prefix=prefix, + MaxKeys=1, + ) + + if "Contents" not in response: + logging.error(f"No data found for prefix: {prefix}") + return None + + latest_key = response["Contents"][0]["Key"] + return self.get_data_by_key(latest_key) + + except ClientError as e: + if e.response["Error"]["Code"] == "NoSuchKey": + return None + raise diff --git a/requirements.txt b/requirements.txt index bc6b6aa..7e9561d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,5 @@ pydantic==2.9.2 motor>=3.6, <4.0.0 tc-temporal-backend==1.0.0 wikiteam3-fork-proxy==1.0.0 +boto3>=1.38.19 +botocore>=1.38.19 From 09acc1e52cf75bbd6897e96c34a9112c31699635 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Tue, 20 May 2025 15:02:35 +0330 Subject: [PATCH 2/4] feat: adjust function docstring! --- hivemind_etl/mediawiki/activities.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hivemind_etl/mediawiki/activities.py b/hivemind_etl/mediawiki/activities.py index cdf0e56..e11ca65 100644 --- a/hivemind_etl/mediawiki/activities.py +++ b/hivemind_etl/mediawiki/activities.py @@ -55,7 +55,7 @@ async def get_hivemind_mediawiki_platforms( @activity.defn async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None: """ - Extract data from MediaWiki API URL and store in S3. + Extract data from MediaWiki API URL """ try: community_id = mediawiki_platform["community_id"] From f4d0360d5a77bec411de43f1b74e0313d1ba6461 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Tue, 20 May 2025 15:50:01 +0330 Subject: [PATCH 3/4] feat: removed unused codes! --- hivemind_etl/storage/s3_client.py | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/hivemind_etl/storage/s3_client.py b/hivemind_etl/storage/s3_client.py index 3ffcdab..73cfad4 100644 --- a/hivemind_etl/storage/s3_client.py +++ b/hivemind_etl/storage/s3_client.py @@ -120,26 +120,3 @@ def get_data_by_key(self, key: str) -> Dict[str, Any]: raise ValueError(f"No data found for key: {key}") logging.error(f"Error retrieving data for key {key}: {str(e)}") raise - - def get_latest_data(self, community_id: str, activity_type: str) -> Dict[str, Any]: - """Get the most recent data for a community and activity type.""" - prefix = f"{community_id}/{activity_type}/" - - try: - response = self.s3_client.list_objects_v2( - Bucket=self.bucket_name, - Prefix=prefix, - MaxKeys=1, - ) - - if "Contents" not in response: - logging.error(f"No data found for prefix: {prefix}") - return None - - latest_key = response["Contents"][0]["Key"] - return self.get_data_by_key(latest_key) - - except ClientError as e: - if e.response["Error"]["Code"] == "NoSuchKey": - return None - raise From f526f516d24df6cafa80a96c61f9298890ecc854 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Tue, 20 May 2025 15:57:33 +0330 Subject: [PATCH 4/4] fix: ensure AWS_REGION is required for S3Client initialization! --- hivemind_etl/storage/s3_client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hivemind_etl/storage/s3_client.py b/hivemind_etl/storage/s3_client.py index 73cfad4..941972a 100644 --- a/hivemind_etl/storage/s3_client.py +++ b/hivemind_etl/storage/s3_client.py @@ -17,7 +17,7 @@ def __init__(self): self.access_key = os.getenv("AWS_ACCESS_KEY_ID") self.secret_key = os.getenv("AWS_SECRET_ACCESS_KEY") self.bucket_name = os.getenv("AWS_S3_BUCKET") - self.region = os.getenv("AWS_REGION", "us-east-1") + self.region = os.getenv("AWS_REGION") self.secure = os.getenv("AWS_SECURE", "true").lower() == "true" # Check each required variable and log if missing @@ -30,6 +30,8 @@ def __init__(self): missing_vars.append("AWS_SECRET_ACCESS_KEY") if not self.bucket_name: missing_vars.append("AWS_S3_BUCKET") + if not self.region: + missing_vars.append("AWS_REGION") if missing_vars: error_msg = (