Gmailで指定の未読メールの取得
ラベルがshopで
お気に入り店舗新着チラシお知らせメール
の件名
の未読メールを取得する
本文のURLをクリックし
杏林堂なら
/html/body/div[1]/div[3]/div[1]/div/div[2]/div[2]
もしくは
/html/body/div[1]/div[3]/div[1]/div/div[2]
で画像をダウンロードし
複数画像の時には1つに統合する
とりあえずここまでできるようにする
まずは件名で未読のものを取得するようにした
クエリに is:unread を追加しました。この変更により、未読のメールのみが検索対象になります。
import os.path import base64 from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build # 認証情報ファイルのパス CREDENTIALS_FILE = 'path/to/credentials.json' TOKEN_FILE = 'token.json' # Gmail APIのスコープ SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] def main(): # トークンファイルが存在する場合は読み込む creds = None if os.path.exists(TOKEN_FILE): creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES) # 認証が有効でない場合は新しく認証を行う if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES) creds = flow.run_local_server(port=0) # トークンを保存する with open(TOKEN_FILE, 'w') as token: token.write(creds.to_json()) # Gmail APIクライアントを構築 service = build('gmail', 'v1', credentials=creds) # 未読メールを検索 query = 'is:unread subject:"【Shufoo!】お気に入り店舗新着チラシお知らせメール"' results = service.users().messages().list(userId='me', q=query).execute() messages = results.get('messages', []) if not messages: print('No unread messages found.') else: print(f'Found {len(messages)} unread messages:') for msg in messages: msg_id = msg['id'] msg = service.users().messages().get(userId='me', id=msg_id).execute() msg_snippet = msg['snippet'] print(f'Message snippet: {msg_snippet}') if __name__ == '__main__': main()
変更点は
query = 'is:unread subject:"【Shufoo!】お気に入り店舗新着チラシお知らせメール"'
の部分
さらに最新の1件のみ取得するようにコード変更
変更点は
maxResults パラメータの追加:
service.users().messages().list メソッドに maxResults=1 を追加
これにより、検索結果として最新の1件のみが返される
これで実行すると
Found 1 unread message: Message snippet: こちらのメールは「Shufoo!」でお気に入り登録した店舗の新着チラシ掲載開始をお知らせするメールです。 以下、1件の新着チラシが掲載開始されました。 ・ピアゴ袋井店https://www.shufoo.net/pntweb/shopDetail/15782/?cid=nmail_pc ※Shufoo!PCサイトまたは、シュフーチラシアプリ(スマートフォン・タブレット端末用) からログインしてお店の s
となる
次に本文からURLのみ抽出する
ただし複数存在するため最初のURLのみ抽出する
本文からURLを抽出するには、メールの本文を取得し、正規表現を使ってURLを抽出
複数のURLが含まれている場合は、最初のURLのみを抽出
import os.path import base64 import re from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build # 認証情報ファイルのパス CREDENTIALS_FILE = 'path/to/credentials.json' TOKEN_FILE = 'token.json' # Gmail APIのスコープ SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] def extract_first_url(text): """テキストから最初のURLを抽出します。""" url_pattern = r'https?://[^\s]+' urls = re.findall(url_pattern, text) return urls[0] if urls else None def main(): # トークンファイルが存在する場合は読み込む creds = None if os.path.exists(TOKEN_FILE): creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES) # 認証が有効でない場合は新しく認証を行う if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES) creds = flow.run_local_server(port=0) # トークンを保存する with open(TOKEN_FILE, 'w') as token: token.write(creds.to_json()) # Gmail APIクライアントを構築 service = build('gmail', 'v1', credentials=creds) # 最新の未読メール1件を検索 query = 'is:unread subject:"【Shufoo!】お気に入り店舗新着チラシお知らせメール"' results = service.users().messages().list(userId='me', q=query, maxResults=1).execute() messages = results.get('messages', []) if not messages: print('No unread messages found.') else: print(f'Found {len(messages)} unread message:') msg_id = messages[0]['id'] msg = service.users().messages().get(userId='me', id=msg_id, format='full').execute() msg_payload = msg['payload'] msg_parts = msg_payload.get('parts', []) body_data = "" for part in msg_parts: if part['mimeType'] == 'text/plain': # or 'text/html' depending on the format you need body_data = part['body']['data'] break # Base64デコード body_text = base64.urlsafe_b64decode(body_data).decode('utf-8') # URLを抽出 first_url = extract_first_url(body_text) if first_url: print(f'First URL found: {first_url}') else: print('No URL found in the message.') if __name__ == '__main__': main()
ここで問題発生
Found 1 unread message: No URL found in the message.
となる
ChatGPTで調べると
変更点
* extract_first_url関数: 正規表現を使ってテキストから最初のURLを抽出する関数を追加しました。
* メール本文の取得: メールのpayloadから本文のデータ部分を取得し、Base64でデコードしています。
* 本文の解析: デコードした本文から最初のURLを抽出しています。
注意点
* MIMEタイプの確認: メールの本文がtext/plainではなくtext/htmlの場合は、if part[‘mimeType’] == ‘text/plain’: を if part[‘mimeType’] == ‘text/html’: に変更する必要があります。
* 複数部品のメール: 一部のメールは複数のpartsに分かれている場合があります。このコードは最初のtext/plainパートからURLを抽出しますが、複雑なメール構造の場合は調整が必要です。
今回の場合
メール本文からURLを抽出するために、正確なデータ部分をデコードすることが必要です。
Gmail APIで取得するメールの本文は複数のパートに分かれていることがあり、
正しいパートからデコードする必要があります。
また、本文の形式がtext/plainかtext/htmlかを確認し、適切にデコードします
get_message_body 関数:
* メールの本文をすべてのパートから取得し、text/plainまたはtext/htmlの内容をデコードします。
* part[‘body’].get(‘data’) を使ってBase64エンコードされた本文データを取得し、デコードして連結します。
本文の抽出:
* get_message_body関数で取得した本文全体からURLを抽出します。
* 正規表現を使用して、最初に見つかったURLを返します。
メール本文のパートを正しく取得してURLを抽出するようにします。メールの本文パートを順番に確認して、デコード
import os.path import base64 import re from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build # 認証情報ファイルのパス CREDENTIALS_FILE = 'path/to/credentials.json' TOKEN_FILE = 'token.json' # Gmail APIのスコープ SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] def extract_first_url(text): """テキストから最初のURLを抽出します。""" url_pattern = r'https?://[^\s]+' urls = re.findall(url_pattern, text) return urls[0] if urls else None def get_message_body(parts): """メールの本文を取得します。""" body = "" for part in parts: if part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html': body_data = part['body'].get('data') if body_data: body += base64.urlsafe_b64decode(body_data).decode('utf-8') return body def main(): # トークンファイルが存在する場合は読み込む creds = None if os.path.exists(TOKEN_FILE): creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES) # 認証が有効でない場合は新しく認証を行う if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES) creds = flow.run_local_server(port=0) # トークンを保存する with open(TOKEN_FILE, 'w') as token: token.write(creds.to_json()) # Gmail APIクライアントを構築 service = build('gmail', 'v1', credentials=creds) # 最新の未読メール1件を検索 query = 'is:unread subject:"【Shufoo!】お気に入り店舗新着チラシお知らせメール"' results = service.users().messages().list(userId='me', q=query, maxResults=1).execute() messages = results.get('messages', []) if not messages: print('No unread messages found.') else: print(f'Found {len(messages)} unread message:') msg_id = messages[0]['id'] msg = service.users().messages().get(userId='me', id=msg_id, format='full').execute() msg_payload = msg['payload'] # メッセージ本文を取得 msg_body = get_message_body(msg_payload.get('parts', [])) # URLを抽出 first_url = extract_first_url(msg_body) if first_url: print(f'First URL found: {first_url}') else: print('No URL found in the message.') if __name__ == '__main__': main()
しかし結果は
Found 1 unread message: No URL found in the message.
再度ChatGPTで調べる
メールの本文からURLが正しく抽出されない場合、
メールの構造が予想とは異なる可能性があります。
Gmail APIで取得したメールのペイロードを詳しく確認し、
メールの本文がどのように構造化されているかを理解する必要があります。
メールの内容は、partsとして分割されているか、
bodyに直接含まれている場合があります。
どちらの場合でも、メールの本文をデコードしてURLを抽出することが必要です。
以下は、メールペイロードを詳細に確認してURLを抽出するための修正コード
import os.path import base64 import re from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build # 認証情報ファイルのパス CREDENTIALS_FILE = 'path/to/credentials.json' TOKEN_FILE = 'token.json' # Gmail APIのスコープ SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] def extract_first_url(text): """テキストから最初のURLを抽出します。""" url_pattern = r'https?://[^\s]+' urls = re.findall(url_pattern, text) return urls[0] if urls else None def get_message_body(payload): """メールの本文を取得します。""" body = "" if 'data' in payload['body']: # 直接bodyに含まれる場合 body = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8') elif 'parts' in payload: # partsに分割されている場合 for part in payload['parts']: if part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html': body_data = part['body'].get('data') if body_data: body += base64.urlsafe_b64decode(body_data).decode('utf-8') return body def main(): # トークンファイルが存在する場合は読み込む creds = None if os.path.exists(TOKEN_FILE): creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES) # 認証が有効でない場合は新しく認証を行う if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES) creds = flow.run_local_server(port=0) # トークンを保存する with open(TOKEN_FILE, 'w') as token: token.write(creds.to_json()) # Gmail APIクライアントを構築 service = build('gmail', 'v1', credentials=creds) # 最新の未読メール1件を検索 query = 'is:unread subject:"【Shufoo!】お気に入り店舗新着チラシお知らせメール"' results = service.users().messages().list(userId='me', q=query, maxResults=1).execute() messages = results.get('messages', []) if not messages: print('No unread messages found.') else: print(f'Found {len(messages)} unread message:') msg_id = messages[0]['id'] msg = service.users().messages().get(userId='me', id=msg_id, format='full').execute() msg_payload = msg['payload'] # メッセージ本文を取得 msg_body = get_message_body(msg_payload) # URLを抽出 first_url = extract_first_url(msg_body) if first_url: print(f'First URL found: {first_url}') else: print('No URL found in the message.') if __name__ == '__main__': main()
修正点
1. get_message_body関数の改良:
* payload[‘body’]に直接データが含まれる場合も考慮しました。この場合、partsはありません。
* partsがある場合でも、本文がtext/plainかtext/htmlのいずれかである部分を探します。
2. URL抽出の再確認:
* extract_first_url関数で、取得した本文全体から最初のURLを抽出します。
これで
Found 1 unread message: First URL found: https://www.shufoo.net/pntweb/shopDetail/15782/?cid=nmail_pc
というようにURLを抽出できた
これを
Seleniumで開き画像をダウンロードするモジュールの引数に当てるようにする
Xpathはcoopだけ違うようだが
実際には同じだった
ダウンロードファイルはタイムスタンプをつければかぶることはないし
そのままcloud vision api で処理してLINEで送信すれば問題ない
このためクリック処理は不要でそのまま画像をダウンロードするようにする
import os import time import requests from PIL import Image from io import BytesIO from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.safari.service import Service as SafariService from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from datetime import datetime def open_link_in_safari(url): # Safariドライバーを使用してブラウザを起動 service = SafariService() driver = webdriver.Safari(service=service) driver.get(url) time.sleep(3) # リンクを開いた後に3秒間待機 return driver def get_images_from_container(driver, base_xpath): image_urls = [] try: # コンテナ内の画像要素を探す container = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, base_xpath)) ) images = container.find_elements(By.TAG_NAME, 'img') for img in images: src = img.get_attribute('src') # 特定の条件に基づいて画像をフィルタリング if 'index/img' in src: image_urls.append(src) print(f'Found image: {src}') except Exception as e: print(f'Error finding images: {e}') return image_urls def download_images(image_urls): images = [] for i, url in enumerate(image_urls): response = requests.get(url) if response.status_code == 200: image = Image.open(BytesIO(response.content)) images.append(image) print(f'Downloaded image_{i}.jpg') else: print(f'Failed to download {url}') return images def merge_images(images, output_path): widths, heights = zip(*(img.size for img in images)) total_height = sum(heights) max_width = max(widths) combined_image = Image.new('RGB', (max_width, total_height)) y_offset = 0 for img in images: combined_image.paste(img, (0, y_offset)) y_offset += img.height combined_image.save(output_path) print(f'Saved combined image as {output_path}') def main(): url = 'https://www.shufoo.net/pntweb/shopDetail/197728/?cid=nmail_pc' driver = open_link_in_safari(url) # 画像を取得してダウンロードする base_xpath_images = '/html/body/div[1]/div[3]/div[1]/div/div[2]/div[3]' image_urls = get_images_from_container(driver, base_xpath_images) driver.quit() if image_urls: images = download_images(image_urls) if images: # 現在の日付を取得してフォーマット current_date = datetime.now().strftime('%Y%m%d') # カレントディレクトリにimagesフォルダを作成 output_dir = 'images' os.makedirs(output_dir, exist_ok=True) # ディレクトリが存在しない場合は作成 output_path = os.path.join(output_dir, f'combined_image_{current_date}.jpg') merge_images(images, output_path) if __name__ == '__main__': main()
これで無事に画像のダウンロードができた
ただし頻繁にxpathが変わるようなので
設定ファイルを作成し
そこで設定したxpathを
base_xpath_images = '/html/body/div[1]/div[3]/div[1]/div/div[2]/div[3]'
で指定しているxpath になるようにコード変更
設定ファイルは
config.json
だが
既にLINE APIの設定で使っているので
これに
"base_xpath_images": "/html/body/div[1]/div[3]/div[1]/div/div[2]/div[3]"
の項目を追加する
import os import time import json import requests from PIL import Image from io import BytesIO from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.safari.service import Service as SafariService from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from datetime import datetime def load_config(config_file): """設定ファイルからコンフィグを読み込む""" with open(config_file, 'r') as file: config = json.load(file) return config def open_link_in_safari(url): # Safariドライバーを使用してブラウザを起動 service = SafariService() driver = webdriver.Safari(service=service) driver.get(url) time.sleep(3) # リンクを開いた後に3秒間待機 return driver def get_images_from_container(driver, base_xpath): image_urls = [] try: # コンテナ内の画像要素を探す container = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, base_xpath)) ) images = container.find_elements(By.TAG_NAME, 'img') for img in images: src = img.get_attribute('src') # 特定の条件に基づいて画像をフィルタリング if 'index/img' in src: image_urls.append(src) print(f'Found image: {src}') except Exception as e: print(f'Error finding images: {e}') return image_urls def download_images(image_urls): images = [] for i, url in enumerate(image_urls): response = requests.get(url) if response.status_code == 200: image = Image.open(BytesIO(response.content)) images.append(image) print(f'Downloaded image_{i}.jpg') else: print(f'Failed to download {url}') return images def merge_images(images, output_path): widths, heights = zip(*(img.size for img in images)) total_height = sum(heights) max_width = max(widths) combined_image = Image.new('RGB', (max_width, total_height)) y_offset = 0 for img in images: combined_image.paste(img, (0, y_offset)) y_offset += img.height combined_image.save(output_path) print(f'Saved combined image as {output_path}') def main(): # 設定ファイルを読み込む config = load_config('config.json') url = 'https://www.shufoo.net/pntweb/shopDetail/197728/?cid=nmail_pc' driver = open_link_in_safari(url) # 設定ファイルからXPathを取得して画像を取得 base_xpath_images = config['base_xpath_images'] image_urls = get_images_from_container(driver, base_xpath_images) driver.quit() if image_urls: images = download_images(image_urls) if images: # 現在の日付を取得してフォーマット current_date = datetime.now().strftime('%Y%m%d') # カレントディレクトリにimagesフォルダを作成 output_dir = 'images' os.makedirs(output_dir, exist_ok=True) # ディレクトリが存在しない場合は作成 output_path = os.path.join(output_dir, f'combined_image_{current_date}.jpg') merge_images(images, output_path) if __name__ == '__main__': main()
またタイムスタンプをファイル名に追加すれば上書き防止になるので
タイムスタンプのフォーマット: datetime.now().strftime(‘%Y%m%d_%H%M%S’)で、
現在の日付と時刻をフォーマットし、年/月/日 時:分:秒の順にします。
これにより、ファイル名が一意になります。
ファイル名への追加: output_pathの生成時にタイムスタンプを含めることで、
同日に何度実行してもファイルが上書きされないようにしました。
フォルダの作成: 出力先のディレクトリが存在しない場合、自動的に作成されます。
これらを組み込み
import os import time import json import requests from PIL import Image from io import BytesIO from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.safari.service import Service as SafariService from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from datetime import datetime def load_config(config_file): """設定ファイルからコンフィグを読み込む""" with open(config_file, 'r') as file: config = json.load(file) return config def open_link_in_safari(url): # Safariドライバーを使用してブラウザを起動 service = SafariService() driver = webdriver.Safari(service=service) driver.get(url) time.sleep(3) # リンクを開いた後に3秒間待機 return driver def get_images_from_container(driver, base_xpath): image_urls = [] try: # コンテナ内の画像要素を探す container = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, base_xpath)) ) images = container.find_elements(By.TAG_NAME, 'img') for img in images: src = img.get_attribute('src') # 特定の条件に基づいて画像をフィルタリング if 'index/img' in src: image_urls.append(src) print(f'Found image: {src}') except Exception as e: print(f'Error finding images: {e}') return image_urls def download_images(image_urls): images = [] for i, url in enumerate(image_urls): response = requests.get(url) if response.status_code == 200: image = Image.open(BytesIO(response.content)) images.append(image) print(f'Downloaded image_{i}.jpg') else: print(f'Failed to download {url}') return images def merge_images(images, output_path): widths, heights = zip(*(img.size for img in images)) total_height = sum(heights) max_width = max(widths) combined_image = Image.new('RGB', (max_width, total_height)) y_offset = 0 for img in images: combined_image.paste(img, (0, y_offset)) y_offset += img.height combined_image.save(output_path) print(f'Saved combined image as {output_path}') def main(): # 設定ファイルを読み込む config = load_config('config.json') url = 'https://www.shufoo.net/pntweb/shopDetail/197728/?cid=nmail_pc' driver = open_link_in_safari(url) # 設定ファイルからXPathを取得して画像を取得 base_xpath_images = config['base_xpath_images'] image_urls = get_images_from_container(driver, base_xpath_images) driver.quit() if image_urls: images = download_images(image_urls) if images: # 現在の日付と時刻を取得してフォーマット current_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # カレントディレクトリにimagesフォルダを作成 output_dir = 'images' os.makedirs(output_dir, exist_ok=True) # ディレクトリが存在しない場合は作成 output_path = os.path.join(output_dir, f'combined_image_{current_timestamp}.jpg') merge_images(images, output_path) if __name__ == '__main__': main()
これで実行すると
Found image: https://ipqcache2.shufoo.net/c/2024/08/09/96510937073938/index/img/0_100_0.jpg Found image: https://ipqcache2.shufoo.net/c/2024/08/09/96510937073938/index/img/0_100_1.jpg Downloaded image_0.jpg Downloaded image_1.jpg Saved combined image as images/combined_image_20240809_233141.jpg
となって無事にダウンロードが実行される
次にこれをモジュールにする
# image_downloader.py import os import json import time import requests from PIL import Image from io import BytesIO from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.safari.service import Service as SafariService from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from datetime import datetime def load_config(config_file): """設定ファイルからコンフィグを読み込む""" with open(config_file, 'r') as file: config = json.load(file) return config def open_link_in_safari(url): """指定されたURLをSafariで開く""" service = SafariService() driver = webdriver.Safari(service=service) driver.get(url) time.sleep(3) # リンクを開いた後に3秒間待機 return driver def get_images_from_container(driver, base_xpath): """指定されたXPathから画像URLを取得する""" image_urls = [] try: container = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, base_xpath)) ) images = container.find_elements(By.TAG_NAME, 'img') for img in images: src = img.get_attribute('src') if 'index/img' in src: image_urls.append(src) print(f'Found image: {src}') except Exception as e: print(f'Error finding images: {e}') return image_urls def download_images(image_urls): """画像URLから画像をダウンロードする""" images = [] for i, url in enumerate(image_urls): response = requests.get(url) if response.status_code == 200: image = Image.open(BytesIO(response.content)) images.append(image) print(f'Downloaded image_{i}.jpg') else: print(f'Failed to download {url}') return images def merge_images(images, output_path): """複数の画像を結合して保存する""" if not images: print("No images to merge.") return widths, heights = zip(*(img.size for img in images)) total_height = sum(heights) max_width = max(widths) combined_image = Image.new('RGB', (max_width, total_height)) y_offset = 0 for img in images: combined_image.paste(img, (0, y_offset)) y_offset += img.height combined_image.save(output_path) print(f'Saved combined image as {output_path}') def download_and_merge_images(config_file, url): """画像をダウンロードして結合するメイン関数""" config = load_config(config_file) driver = open_link_in_safari(url) base_xpath_images = config['base_xpath_images'] image_urls = get_images_from_container(driver, base_xpath_images) driver.quit() if image_urls: images = download_images(image_urls) if images: current_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_dir = 'images' os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, f'combined_image_{current_timestamp}.jpg') merge_images(images, output_path)
として
image_downloader.py
を作成
ただし、これだと最新のファイルを探すなどの処理が必要になるため
作成したファイル名を返り値として渡すようにすれば
そのファイルに対して
Cloud vision api を実行できるはず
# image_downloader.py import os import json import time import requests from PIL import Image from io import BytesIO from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.safari.service import Service as SafariService from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from datetime import datetime def load_config(config_file): """設定ファイルからコンフィグを読み込む""" with open(config_file, 'r') as file: config = json.load(file) return config def open_link_in_safari(url): """指定されたURLをSafariで開く""" service = SafariService() driver = webdriver.Safari(service=service) driver.get(url) time.sleep(3) # リンクを開いた後に3秒間待機 return driver def get_images_from_container(driver, base_xpath): """指定されたXPathから画像URLを取得する""" image_urls = [] try: container = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, base_xpath)) ) images = container.find_elements(By.TAG_NAME, 'img') for img in images: src = img.get_attribute('src') if 'index/img' in src: image_urls.append(src) print(f'Found image: {src}') except Exception as e: print(f'Error finding images: {e}') return image_urls def download_images(image_urls): """画像URLから画像をダウンロードする""" images = [] for i, url in enumerate(image_urls): response = requests.get(url) if response.status_code == 200: image = Image.open(BytesIO(response.content)) images.append(image) print(f'Downloaded image_{i}.jpg') else: print(f'Failed to download {url}') return images def merge_images(images, output_path): """複数の画像を結合して保存する""" if not images: print("No images to merge.") return widths, heights = zip(*(img.size for img in images)) total_height = sum(heights) max_width = max(widths) combined_image = Image.new('RGB', (max_width, total_height)) y_offset = 0 for img in images: combined_image.paste(img, (0, y_offset)) y_offset += img.height combined_image.save(output_path) print(f'Saved combined image as {output_path}') def download_and_merge_images(config_file, url): """画像をダウンロードして結合するメイン関数""" config = load_config(config_file) driver = open_link_in_safari(url) base_xpath_images = config['base_xpath_images'] image_urls = get_images_from_container(driver, base_xpath_images) driver.quit() if image_urls: images = download_images(image_urls) if images: current_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_dir = 'images' os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, f'combined_image_{current_timestamp}.jpg') merge_images(images, output_path) return output_path # 生成されたファイル名を返す return None # 画像がなかった場合はNoneを返す
次にgmailの未読メールからURLを取得する部分もモジュール化する
touch gmail_url_extractor.py
でファイルを作成
import os.path import base64 import re from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build # 認証情報ファイルのパス CREDENTIALS_FILE = 'path/to/credentials.json' TOKEN_FILE = 'token.json' # Gmail APIのスコープ SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] def authenticate_gmail(): """Gmail APIに認証し、サービスを構築します。""" creds = None if os.path.exists(TOKEN_FILE): creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES) creds = flow.run_local_server(port=0) with open(TOKEN_FILE, 'w') as token: token.write(creds.to_json()) service = build('gmail', 'v1', credentials=creds) return service def extract_first_url(text): """テキストから最初のURLを抽出します。""" url_pattern = r'https?://[^\s]+' urls = re.findall(url_pattern, text) return urls[0] if urls else None def get_message_body(payload): """メールの本文を取得します。""" body = "" if 'data' in payload['body']: body = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8') elif 'parts' in payload: for part in payload['parts']: if part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html': body_data = part['body'].get('data') if body_data: body += base64.urlsafe_b64decode(body_data).decode('utf-8') return body def get_first_unread_email_url(subject_query): """指定された件名を持つ未読メールから最初のURLを取得します。""" service = authenticate_gmail() query = f'is:unread subject:"{subject_query}"' results = service.users().messages().list(userId='me', q=query, maxResults=1).execute() messages = results.get('messages', []) if not messages: print('No unread messages found.') return None msg_id = messages[0]['id'] msg = service.users().messages().get(userId='me', id=msg_id, format='full').execute() msg_payload = msg['payload'] msg_body = get_message_body(msg_payload) first_url = extract_first_url(msg_body) return first_url
として保存
使う時には
from gmail_url_extractor import get_first_unread_email_url def main(): subject_query = "【Shufoo!】お気に入り店舗新着チラシお知らせメール" url = get_first_unread_email_url(subject_query) if url: print(f'First URL found: {url}') else: print('No URL found in the message.') if __name__ == '__main__': main()
というように使う
ファイルのダウンロードと生成モジュールは
# example_usage.py from image_downloader import download_and_merge_images def main(): config_file = 'config.json' url = 'https://www.shufoo.net/pntweb/shopDetail/197728/?cid=nmail_pc' output_path = download_and_merge_images(config_file, url) if output_path: print(f"Generated file: {output_path}") # Cloud Vision APIを実行するコードをここに追加 # example: run_cloud_vision_api(output_path) else: print("No image file was created.") if __name__ == '__main__': main()
として使う
とりあえずほとんど準備できたので
あとは
ocr_list.py
で
gmail_url_extractor.py
でURLを取得し
image_downloader.py
で画像を取得
この画像に対してOCRすればOKとなる