JavaScriptで作成されたページからPDFダウンロード
pip install selenium
でインストール
from selenium import webdriver # Safari WebDriverのインスタンスを作成 driver = webdriver.Safari() # Googleのホームページにアクセス driver.get("https://www.google.com") # ウェブページのタイトルをコンソールに表示 print(driver.title) # ブラウザを閉じる driver.quit()
で実行できたので
safariでseleniumで行ってみて
成功したらubuntu でchrome などで行うようにしてみる
とりあえずまずはリンクを表示できるかテスト
まず、SafariのWebDriverを有効にし、SeleniumでSafariを使用する準備を整える必要があることを思い出してください。これには、Safariの「開発」メニューで「リモートオートメーションを許可」を有効にする手順が含まれます。
これは
Safari > 設定 > 詳細タブをクリック
Webデベロッパ用の機能を表示をチェックし
デベロッパタブをクリックし
リモートオートメーションを許可にチェックを入れることでできる
from selenium import webdriver from selenium.webdriver.common.by import By # WebDriverの初期化(Safari) driver = webdriver.Safari() # ページにアクセス driver.get("https://message.cocoo.education/message/ファイル番号") # ページが完全にロードされるまで待機 driver.implicitly_wait(10) # 秒 # 'プレビュー'リンクを探す try: preview_link = driver.find_element(By.LINK_TEXT, "プレビュー") print("プレビューリンク:", preview_link.get_attribute('href')) except: print("プレビューリンクが見つかりませんでした。") # ブラウザを閉じる driver.quit()
これでブラウザが立ち上がり
ログにPDFのリンクが表示される
つまりselenium ならURLの取得が可能
なのでPDF取得まで行うようにする
しかし
from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC driver = webdriver.Safari() driver.get("https://message.cocoo.education/message/ファイル番号") try: # 要素が見えるまで最大10秒間待機します。 preview_button = WebDriverWait(driver, 10).until( EC.visibility_of_element_located((By.XPATH, '//button[contains(text(),"プレビュー")]')) ) preview_button.click() except Exception as e: print("エラーが発生しました:", e) finally: driver.quit()
というようにページの動作を待つように変更したが変わらない
このためやはり機能を分割し
まずgmail からURLリンクを取得
次にこのURLからseleniumでプレビューボタンおURLを取得
取得したボタンURLからPDFを取得
としてみる
とりあえずPDFのリンクを取得するため
バックグランドでseleniumを実行し
PDFのリンクをテキストとして入れるメソッドに書き換える
from selenium import webdriver from selenium.webdriver.common.by import By # WebDriverの初期化(Safari) driver = webdriver.Safari() # ページにアクセス driver.get("https://message.cocoo.education/message/ファイル番号") # ページが完全にロードされるまで待機 driver.implicitly_wait(10) # 秒 # 'プレビュー'リンクを探す try: preview_link = driver.find_element(By.LINK_TEXT, "プレビュー") print("プレビューリンク:", preview_link.get_attribute('href')) except: print("プレビューリンクが見つかりませんでした。") # ブラウザを閉じる driver.quit()
しかしsafariだとヘッドレスモードがない
まぁ試作品なので
とりあえずこれをモジュールにするだけにしておく
from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC def find_preview_link(url, link_text="プレビュー"): # WebDriverの初期化(Safari) driver = webdriver.Safari() try: # ページにアクセス driver.get(url) # ページが完全にロードされるまで待機 driver.implicitly_wait(10) # 秒 # 'プレビュー'リンクを探す preview_link = WebDriverWait(driver, 10).until( EC.visibility_of_element_located((By.LINK_TEXT, link_text)) ) # リンクのURLを返す return preview_link.get_attribute('href') except Exception as e: print(f"エラーが発生しました: {e}") return None finally: # ブラウザを閉じる driver.quit() # 使用例 url = "https://message.cocoo.education/message/1ff451cda4cd705697e71734637ec2b5b5d4152d447203420f52da4a35cd8223" link_url = find_preview_link(url) if link_url: print("プレビューリンク:", link_url) else: print("プレビューリンクが見つかりませんでした。")
これでモジュールにできる
次にこのPDFのリンクを使いPDFをダウンロードする
from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import requests def find_preview_link(url, link_text="プレビュー"): driver = webdriver.Safari() try: driver.get(url) driver.implicitly_wait(10) # 秒 preview_link = WebDriverWait(driver, 10).until( EC.visibility_of_element_located((By.LINK_TEXT, link_text)) ) return preview_link.get_attribute('href') except Exception as e: print(f"エラーが発生しました: {e}") return None finally: driver.quit() # 使用例 url = "https://message.cocoo.education/message/ファイル番号" pdf_url = find_preview_link(url) if pdf_url: print("プレビューリンク:", pdf_url) # ダウンロードしたいPDFファイルのURL response = requests.get(pdf_url) # レスポンスのステータスコードが200(成功)の場合、ファイルを保存 if response.status_code == 200: with open("downloaded_file.pdf", "wb") as f: f.write(response.content) print("ファイルが正常にダウンロードされました。") else: print("ファイルのダウンロードに失敗しました。ステータスコード:", response.status_code) else: print("プレビューリンクが見つかりませんでした。")
これでOK
次にこのメソッドの最初の取得対象URLをgmailから取得する
以前
from __future__ import print_function import os.path import re from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials import base64 import email import dateutil.parser # スコープの設定 SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] # トークンとクレデンシャルのパス tokenPath = "token.json" credentialsPath = "credentials.json" # メール本文のデコード関数 def decode(encoded): decoded = base64.urlsafe_b64decode(encoded).decode() return decoded # URLを削除する関数 def remove_urls(text): # URLにマッチする正規表現パターン url_pattern = r'https?://\S+|www\.\S+' return re.sub(url_pattern, '', text) # 最新の未読メール本文を取得する関数 def gmail_get_latest_unread_message_body(service, labelIdsValue): messages = service.users().messages() msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread", maxResults=1).execute() if 'messages' not in msg_list: return "No unread messages found." # 最新のメッセージ(最新の未読メール)を選択 msg = msg_list['messages'][0] date = gmail_get_messages_body_date(messages, msg) topid = msg['id'] msg = messages.get(userId='me', id=topid).execute() # メールの本文を取得 body = "" if msg["payload"]["body"]["size"] != 0: body = decode(msg["payload"]["body"]["data"]) elif 'parts' in msg["payload"]: # メール本文が parts 属性にある場合 for part in msg["payload"]["parts"]: if part["body"]["size"] != 0: body = decode(part["body"]["data"]) break if not body: return date + "<br> No body content" # URLを削除 body_no_urls = remove_urls(body) return date + "<br>" + body_no_urls # メールの受信日時を取得する関数(変更なし) def gmail_get_messages_body_date(messages, msg): msg_id = msg['id'] m = messages.get(userId='me', id=msg_id, format='raw').execute() raw = base64.urlsafe_b64decode(m['raw']) eml = email.message_from_bytes(raw) date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S") return date # Gmail API 初期化関数(変更なし) def gmail_init(): creds = None if os.path.exists(tokenPath): creds = Credentials.from_authorized_user_file(tokenPath, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( credentialsPath, SCOPES) creds = flow.run_local_server(port=0) with open(tokenPath, 'w') as token: token.write(creds.to_json()) service = build('gmail', 'v1', credentials=creds) return service # メイン処理 service = gmail_init() # ラベル ID を指定して最新の未読メール本文を取得 latest_unread_message_body = gmail_get_latest_unread_message_body(service, "ラベルのID") print(latest_unread_message_body)
でURLを削除した本文を表示するものを作成した
これをURLと本文をそれぞれの変数に格納して返すようなメソッドに書き換える
import os.path import re from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials import base64 import email import dateutil.parser # スコープの設定 SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] # トークンとクレデンシャルのパス tokenPath = "token.json" credentialsPath = "credentials.json" # メール本文のデコード関数 def decode(encoded): decoded = base64.urlsafe_b64decode(encoded).decode() return decoded # URLを検出してリストに格納する関数 def extract_urls(text): url_pattern = r'https?://\S+|www\.\S+' urls = re.findall(url_pattern, text) text_no_urls = re.sub(url_pattern, '', text) return urls, text_no_urls # 最新の未読メール本文とURLを取得する関数 def gmail_get_latest_unread_message_body(service, labelIdsValue): messages = service.users().messages() msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread", maxResults=1).execute() if 'messages' not in msg_list: return "No unread messages found.", [] msg = msg_list['messages'][0] date = gmail_get_messages_body_date(messages, msg) topid = msg['id'] msg = messages.get(userId='me', id=topid).execute() body = "" if msg["payload"]["body"]["size"] != 0: body = decode(msg["payload"]["body"]["data"]) elif 'parts' in msg["payload"]: for part in msg["payload"]["parts"]: if part["body"]["size"] != 0: body = decode(part["body"]["data"]) break if not body: return date + "<br> No body content", [] urls, body_no_urls = extract_urls(body) return date + "<br>" + body_no_urls, urls def gmail_get_messages_body_date(messages, msg): msg_id = msg['id'] m = messages.get(userId='me', id=msg_id, format='raw').execute() raw = base64.urlsafe_b64decode(m['raw']) eml = email.message_from_bytes(raw) date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S") return date def gmail_init(): creds = None if os.path.exists(tokenPath): creds = Credentials.from_authorized_user_file(tokenPath, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( credentialsPath, SCOPES) creds = flow.run_local_server(port=0) with open(tokenPath, 'w') as token: token.write(creds.to_json()) service = build('gmail', 'v1', credentials=creds) return service service = gmail_init() # ラベル ID を指定して最新の未読メール本文とURLを取得 latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID") print(latest_unread_message_body) print(urls)
これで本文とURLの分離取得ができた
次にこのメソッドで得られたURLからPDFをダウンロードするようにしたい
その前にモジュールかしておく
vim gmail_utils.py
中身を
# gmail_utils.py import os.path import re from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials import base64 import email import dateutil.parser # スコープの設定 SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] # メール本文のデコード関数 def decode(encoded): decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII')) decoded_message = decoded_bytes.decode('utf-8') return decoded_message # URLを検出してリストに格納する関数 def extract_urls(text): url_pattern = r'https?://\S+|www\.\S+' urls = re.findall(url_pattern, text) text_no_urls = re.sub(url_pattern, '', text) return urls, text_no_urls # 最新の未読メール本文とURLを取得する関数 def gmail_get_latest_unread_message_body(service, labelIdsValue): messages = service.users().messages() msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute() if 'messages' not in msg_list: return "No unread messages found.", [] msg = msg_list['messages'][0] msg_id = msg['id'] msg = messages.get(userId='me', id=msg_id, format='full').execute() date = gmail_get_messages_body_date(msg) body = "" if 'parts' in msg['payload']: for part in msg['payload']['parts']: if part['mimeType'] == 'text/plain' and part['body']['size'] > 0: body = decode(part['body']['data']) break else: body = decode(msg['payload']['body']['data']) urls, body_no_urls = extract_urls(body) return date + "<br>" + body_no_urls, urls # メールの受信日時を取得する関数 def gmail_get_messages_body_date(msg): headers = msg['payload']['headers'] date_header = next(header['value'] for header in headers if header['name'].lower() == 'date') date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d_%H-%M-%S") return date # Gmail API 初期化関数 def gmail_init(): creds = None if os.path.exists('token.json'): creds = Credentials.from_authorized_user_file('token.json', SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( 'credentials.json', SCOPES) creds = flow.run_local_server(port=0) with open('token.json', 'w') as token: token.write(creds.to_json()) service = build('gmail', 'v1', credentials=creds) return service
として
from gmail_utils import gmail_init, gmail_get_latest_unread_message_body # Gmail API サービスを初期化 service = gmail_init() # ラベルIDを指定して最新の未読メール本文とURLを取得 latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "INBOX") print(latest_unread_message_body) for url in urls: print(url)
のINBOXをラベルIDに変える
これで実行すると未読メールが表示される
次にこれを使いPDFのダウンロード
from gmail_utils import gmail_init, gmail_get_latest_unread_message_body from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import requests def download_pdf_from_gmail_links(): service = gmail_init() # Gmail API サービスを初期化 # ラベル ID を指定して最新の未読メール本文とURLを取得 _, urls = gmail_get_latest_unread_message_body(service, "INBOX") if not urls: print("メールからURLを抽出できませんでした。") return for url in urls: pdf_url = find_preview_link(url) if pdf_url: print("プレビューリンク:", pdf_url) try: response = requests.get(pdf_url) if response.status_code == 200: filename = "downloaded_file.pdf" with open(filename, "wb") as f: f.write(response.content) print(f"ファイルが正常にダウンロードされました: {filename}") else: print(f"ファイルのダウンロードに失敗しました。ステータスコード: {response.status_code}") except Exception as e: print(f"ダウンロード中にエラーが発生しました: {e}") else: print("プレビューリンクが見つかりませんでした。") download_pdf_from_gmail_links()
コードを大幅に変更
# gmail_utils.py import os.path import re from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials import base64 import email import dateutil.parser # スコープの設定 SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] def decode(encoded): decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII')) decoded_message = decoded_bytes.decode('utf-8') return decoded_message def extract_urls(text): url_pattern = r'https?://\S+|www\.\S+' urls = re.findall(url_pattern, text) text_no_urls = re.sub(url_pattern, '', text) return urls, text_no_urls def gmail_get_latest_unread_message_body(service, labelIdsValue): messages = service.users().messages() msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute() if 'messages' not in msg_list: return "No unread messages found.", [] msg = msg_list['messages'][0] msg_id = msg['id'] msg = messages.get(userId='me', id=msg_id, format='full').execute() date = gmail_get_messages_body_date(msg) body = "" if 'parts' in msg['payload']: for part in msg['payload']['parts']: if part['mimeType'] == 'text/plain' and part['body']['size'] > 0: body = decode(part['body']['data']) break else: body = decode(msg['payload']['body']['data']) urls, body_no_urls = extract_urls(body) return date + "<br>" + body_no_urls, urls def gmail_get_messages_body_date(msg): headers = msg['payload']['headers'] date_header = next(header['value'] for header in headers if header['name'].lower() == 'date') date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d_%H-%M-%S") return date def gmail_init(): creds = None if os.path.exists('token.json'): creds = Credentials.from_authorized_user_file('token.json', SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( 'credentials.json', SCOPES) creds = flow.run_local_server(port=0) with open('token.json', 'w') as token: token.write(creds.to_json()) service = build('gmail', 'v1', credentials=creds) return service
# gmail_utils.py import os.path import re from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials import base64 import email import dateutil.parser # スコープの設定 SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] def decode(encoded): decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII')) decoded_message = decoded_bytes.decode('utf-8') return decoded_message def extract_urls(text): url_pattern = r'https?://\S+|www\.\S+' urls = re.findall(url_pattern, text) text_no_urls = re.sub(url_pattern, '', text) return urls, text_no_urls def gmail_get_latest_unread_message_body(service, labelIdsValue): messages = service.users().messages() msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute() if 'messages' not in msg_list: return "No unread messages found.", [] msg = msg_list['messages'][0] msg_id = msg['id'] msg = messages.get(userId='me', id=msg_id, format='full').execute() date = gmail_get_messages_body_date(msg) body = "" if 'parts' in msg['payload']: for part in msg['payload']['parts']: if part['mimeType'] == 'text/plain' and part['body']['size'] > 0: body = decode(part['body']['data']) break else: body = decode(msg['payload']['body']['data']) urls, body_no_urls = extract_urls(body) return date + "<br>" + body_no_urls, urls def gmail_get_messages_body_date(msg): headers = msg['payload']['headers'] date_header = next(header['value'] for header in headers if header['name'].lower() == 'date') date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d_%H-%M-%S") return date def gmail_init(): creds = None if os.path.exists('token.json'): creds = Credentials.from_authorized_user_file('token.json', SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( 'credentials.json', SCOPES) creds = flow.run_local_server(port=0) with open('token.json', 'w') as token: token.write(creds.to_json()) service = build('gmail', 'v1', credentials=creds) return service
としたらバグった
とりあえず
gmail_get_text_and_url.py
でのURLと本文の取得
get_pdf.py
でPDFの取得はできた
あとはこの2つを組み合わせるだけ
vim gmail_api_utils.py
でgmailのテキストとURLを取り出す機能をモジュールにして他で呼び出せるようにする
ファイルの内容は
service = gmail_init() # ラベル ID を指定して最新の未読メール本文とURLを取得 latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID") print(latest_unread_message_body) print(urls)
より上の部分のみ抽出して保存
main.py
で
from gmail_api_utils import gmail_init, gmail_get_latest_unread_message_body service = gmail_init() # ラベル ID を指定して最新の未読メール本文とURLを取得 latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID") print(latest_unread_message_body) for url in urls: print(url)
というように
from gmail_api_utils import gmail_init, gmail_get_latest_unread_message_body
でインポートして
service = gmail_init()
で初期化
from gmail_api_utils import gmail_init, gmail_get_latest_unread_message_body service = gmail_init() # ラベル ID を指定して最新の未読メール本文とURLを取得 latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID") print(latest_unread_message_body) for url in urls: print(url)
で呼び出せば同じ効果が出る
同様にPDFダウンロードもモジュールにする
from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import requests def find_preview_link(url, link_text="プレビュー"): driver = webdriver.Safari() # またはChrome(), Firefox() など、使用するブラウザに合わせて変更 try: driver.get(url) driver.implicitly_wait(10) # 秒 preview_link = WebDriverWait(driver, 10).until( EC.visibility_of_element_located((By.LINK_TEXT, link_text)) ) return preview_link.get_attribute('href') except Exception as e: print(f"エラーが発生しました: {e}") return None finally: driver.quit() def download_pdf(url, file_path="downloaded_file.pdf"): if url: print("プレビューリンク:", url) response = requests.get(url) if response.status_code == 200: with open(file_path, "wb") as f: f.write(response.content) print("ファイルが正常にダウンロードされました。") else: print("ファイルのダウンロードに失敗しました。ステータスコード:", response.status_code) else: print("プレビューリンクが見つかりませんでした。")
これを
from pdf_downloader import find_preview_link, download_pdf # 使用例 url = "https://message.cocoo.education/message/1ff451cda4cd705697e71734637ec2b5b5d4152d447203420f52da4a35cd8223" pdf_url = find_preview_link(url) download_pdf(pdf_url)
として保存し実行すればOK
あとは組み合わせる
しかし
プレビューリンクが
https://s3.ap-northeast-1.amazonaws.com/storage.cocoo.education/ファイル番号
というAmazonS3へのリンク
のはずがgmailから抽出したリンクになっているのでエラー
原因は
# ここでは、プレビューリンクを直接取得する機能がないため、URLがPDFファイルを指していると仮定しています download_pdf(url, file_path="downloaded_file.pdf")
の部分
なのでURLが違う
これを修正する
なおGPTで出力した時にはINBOXになっているため
指定のラベルに変更することが必要になる
main3.py
を
from gmail_utils import gmail_init, gmail_get_latest_unread_message_body from pdf_downloader import find_preview_link, download_pdf def save_email_body_to_text(body, filename="email_body.txt"): with open(filename, "w", encoding="utf-8") as file: file.write(body) def main(): # Gmail API サービスを初期化 service = gmail_init() # ラベル ID を指定して最新の未読メール本文とURLを取得 body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID") if urls: for url in urls: print(f"プレビューリンクを検索するURL: {url}") # プレビューリンクを取得 preview_url = find_preview_link(url) if preview_url: print(f"プレビューリンク: {preview_url}") # プレビューリンクからPDFファイルのダウンロードを試みる download_pdf(preview_url, file_path="downloaded_file.pdf") else: print("プレビューリンクが見つかりませんでした。") else: print("メールにURLが見つかりませんでした。") # メール本文をテキストファイルに保存 save_email_body_to_text(body) if __name__ == "__main__": main()
とすることで
メール本文はテキストファイルへ
PDFはダウンロードすることが完了した