hotpocket/services/backend/hotpocket_backend/apps/ui/services/imports.py
Tomek Wójcik b4338e2769
Some checks failed
CI / Checks (push) Failing after 13m2s
Release v1.0.0
2025-08-20 21:00:50 +02:00

71 lines
2.3 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import annotations
import csv
import datetime
import os
import uuid
from celery.result import AsyncResult
from django import db
from django.utils.timezone import get_current_timezone
from hotpocket_backend.apps.ui.services.workflows import ImportSaveWorkflow
from hotpocket_backend.apps.ui.tasks import import_from_pocket
from hotpocket_common.uuid import uuid7_from_timestamp
class UIImportsService:
def import_from_pocket(self,
*,
account_uuid: uuid.UUID,
csv_path: str,
) -> list[tuple[uuid.UUID, uuid.UUID]]:
result = []
with db.transaction.atomic():
try:
with open(csv_path, 'r', encoding='utf-8') as csv_file:
csv_reader = csv.DictReader(
csv_file,
fieldnames=['title', 'url', 'time_added', 'tags', 'status'],
)
current_timezone = get_current_timezone()
is_header = False
for row in csv_reader:
if is_header is False:
is_header = True
continue
timestamp = int(row['time_added'])
save, association = ImportSaveWorkflow().run(
account_uuid=account_uuid,
url=row['url'],
title=row['title'],
pk=uuid7_from_timestamp(timestamp),
created_at=datetime.datetime.fromtimestamp(
timestamp, tz=current_timezone,
),
)
result.append((save.pk, association.pk))
finally:
os.unlink(csv_path)
return result
def schedule_import_from_pocket(self,
*,
account_uuid: uuid.UUID,
csv_path: str,
) -> AsyncResult:
return import_from_pocket.apply_async(
kwargs={
'account_uuid': account_uuid,
'csv_path': csv_path,
},
)