Skip to content

Commit

Permalink
feat(search.py): update fetch_snippet select classname
Browse files Browse the repository at this point in the history
  • Loading branch information
NorkzYT committed Oct 11, 2023
1 parent f95b6bf commit 5dc4450
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 62 deletions.
155 changes: 94 additions & 61 deletions novexity/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@

class Configuration:
def __init__(self):
self.AWS_ACCESS_KEY_ID = os.getenv('GOOGLE_SEARCH_AWS_ACCESS_KEY_ID')
self.AWS_ACCESS_KEY_SECRET = os.getenv(
'GOOGLE_SEARCH_AWS_SECRET_ACCESS_KEY')
self.AWS_ACCESS_KEY_ID = os.getenv("GOOGLE_SEARCH_AWS_ACCESS_KEY_ID")
self.AWS_ACCESS_KEY_SECRET = os.getenv("GOOGLE_SEARCH_AWS_SECRET_ACCESS_KEY")


config = Configuration() # Initialize the configuration

VALID_KEYS = {'title', 'link', 'displayed_link',
'favicon', 'snippet', 'source'}
VALID_KEYS = {"title", "link", "displayed_link", "favicon", "snippet", "source"}

invalid_titles = ["Description"]

Expand All @@ -33,7 +31,13 @@ def __init__(self, params):
self.fields = params.get("fields", [])

def get_dict(self):
return search(self.query, *self.fields, country=self.country, lang=self.lang, location=self.location)
return search(
self.query,
*self.fields,
country=self.country,
lang=self.lang,
location=self.location,
)


def configure(aws_access_key_id=None, aws_secret_access_key=None):
Expand Down Expand Up @@ -63,13 +67,15 @@ def format_json_output(data):
return json.dumps(data, indent=4, ensure_ascii=False)


def search(query: str, *fields, country=None, lang="en", location=None, lang_restrict=None):
def search(
query: str, *fields, country=None, lang="en", location=None, lang_restrict=None
):
"""
Searches Google for a given query string and returns the organic search results.
Args:
- query (str): The search term or phrase to look up on Google.
- *fields (str): Fields you want in the results. Options include: 'title',
- *fields (str): Fields you want in the results. Options include: 'title',
'link', 'displayed_link', 'favicon', 'snippet', 'source'.
Returns:
Expand All @@ -79,19 +85,28 @@ def search(query: str, *fields, country=None, lang="en", location=None, lang_res
- Returns an error dictionary if the search fails.
"""
# Check for the 'position' key and invalid keys:
if 'position' in fields:
return format_json_output({"error": "The 'position' key is not a valid choice."}), None
if "position" in fields:
return (
format_json_output({"error": "The 'position' key is not a valid choice."}),
None,
)
invalid_keys = set(fields) - VALID_KEYS
if invalid_keys:
return format_json_output({"error": f"Invalid keys: {', '.join(invalid_keys)}"}), None
return (
format_json_output({"error": f"Invalid keys: {', '.join(invalid_keys)}"}),
None,
)

headers = {
"User-Agent": 'Mozilla/5.0 (X11; Linux x86_64)'
' AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4454.0 Safari/537.36'
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64)"
" AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4454.0 Safari/537.36"
}

gateway = ApiGateway("https://www.google.com", access_key_id=config.AWS_ACCESS_KEY_ID,
access_key_secret=config.AWS_ACCESS_KEY_SECRET)
gateway = ApiGateway(
"https://www.google.com",
access_key_id=config.AWS_ACCESS_KEY_ID,
access_key_secret=config.AWS_ACCESS_KEY_SECRET,
)
gateway.start()

session = requests.Session()
Expand All @@ -118,84 +133,105 @@ def search(query: str, *fields, country=None, lang="en", location=None, lang_res
try:
response = session.get(url, headers=headers)
if response.status_code == 200:
print('Received a 200 OK response!')
print("Received a 200 OK response!")
break
print('Status Code:', response.status_code, "Switching IP...")
print("Status Code:", response.status_code, "Switching IP...")
except requests.exceptions.Timeout:
print('Request timed out. Switching IP...')
print("Request timed out. Switching IP...")
except requests.ConnectionError:
print('Connection error. Switching IP...')
print("Connection error. Switching IP...")
except Exception as error:
print('An unexpected error occurred:',
error, ". Switching IP...")
print("An unexpected error occurred:", error, ". Switching IP...")

if response.status_code != 200:
return format_json_output({"error": "Failed to retrieve web page."}), gateway
return (
format_json_output({"error": "Failed to retrieve web page."}),
gateway,
)

soup = BeautifulSoup(response.text, 'html.parser')
soup = BeautifulSoup(response.text, "html.parser")

# Check if search results are loaded
if not soup.select_one('div#search'):
if not soup.select_one("div#search"):
return {"error": "Search results not loaded."}, gateway

search_results = []

# Check which fields are requested
fetch_all = not bool(fields)
fetch_position = 'position' in fields or fetch_all
fetch_title = 'title' in fields or fetch_all
fetch_link = 'link' in fields or fetch_all
fetch_displayed_link = 'displayed_link' in fields or fetch_all
fetch_favicon = 'favicon' in fields or fetch_all
fetch_snippet = 'snippet' in fields or fetch_all
fetch_source = 'source' in fields or fetch_all
fetch_position = "position" in fields or fetch_all
fetch_title = "title" in fields or fetch_all
fetch_link = "link" in fields or fetch_all
fetch_displayed_link = "displayed_link" in fields or fetch_all
fetch_favicon = "favicon" in fields or fetch_all
fetch_snippet = "snippet" in fields or fetch_all
fetch_source = "source" in fields or fetch_all

# If any of these fields are fetched, always include position
if any([fetch_title, fetch_link, fetch_displayed_link, fetch_favicon, fetch_snippet, fetch_source]) and 'position' not in fields:
if (
any(
[
fetch_title,
fetch_link,
fetch_displayed_link,
fetch_favicon,
fetch_snippet,
fetch_source,
]
)
and "position" not in fields
):
fetch_position = True

position = 0
for result in soup.select('.g'):
for result in soup.select(".g"):
# Here we build the result_dict based on the fields requested:
result_dict = OrderedDict()

if fetch_position:
position += 1
result_dict["position"] = position

if fetch_title and result.select_one('h3'):
result_dict["title"] = result.select_one('h3').get_text()
if fetch_link and result.select_one('a'):
anchor_tag = result.select_one('a')
if anchor_tag.has_attr('href'):
result_dict["link"] = unquote(anchor_tag['href'].split("&")[
0].replace("/url?q=", ""))
if fetch_title and result.select_one("h3"):
result_dict["title"] = result.select_one("h3").get_text()
if fetch_link and result.select_one("a"):
anchor_tag = result.select_one("a")
if anchor_tag.has_attr("href"):
result_dict["link"] = unquote(
anchor_tag["href"].split("&")[0].replace("/url?q=", "")
)
if fetch_displayed_link:
displayed_link_parts = result.select_one('.TbwUpd')
displayed_link_parts = result.select_one(".TbwUpd")
if displayed_link_parts:
result_dict["displayed_link"] = " ".join(
displayed_link_parts.stripped_strings)
if fetch_favicon and result.select_one('.eqA2re.NjwKYd img'):
result_dict["favicon"] = result.select_one(
'.eqA2re.NjwKYd img')['src']
displayed_link_parts.stripped_strings
)
if fetch_favicon and result.select_one(".eqA2re.NjwKYd img"):
result_dict["favicon"] = result.select_one(".eqA2re.NjwKYd img")["src"]
if fetch_snippet:
snippet_parts = result.select(
'.MUxGbd.yXK7lf.MUxGbd.yDYNvb.lyLwlc')
".VwiC3b.yXK7lf.lyLwlc.yDYNvb.W8l4ac.lEBKkf span"
)

if snippet_parts:
result_dict["snippet"] = ' '.join(
part.get_text() for part in snippet_parts)
result_dict["snippet"] = " ".join(
part.get_text() for part in snippet_parts
)
if fetch_source:
source_element = result.select_one('cite')
source_element = result.select_one("cite")
if source_element:
result_dict["source"] = source_element.get_text()

# At least one essential field (like title or link) must be present
if (result_dict.get("title") or result_dict.get("link")) and result_dict.get("title") not in invalid_titles:
if (
result_dict.get("title") or result_dict.get("link")
) and result_dict.get("title") not in invalid_titles:
# Ensure that the result is unique (based on title and link)
if not any(existing_result.get("link") == result_dict.get("link") and
existing_result.get(
"title") == result_dict.get("title")
for existing_result in search_results):
if not any(
existing_result.get("link") == result_dict.get("link")
and existing_result.get("title") == result_dict.get("title")
for existing_result in search_results
):
search_results.append(result_dict)

# Reset the position values to ensure they are sequential
Expand All @@ -204,17 +240,14 @@ def search(query: str, *fields, country=None, lang="en", location=None, lang_res

# Sort the search_results based on the position key
search_results = sorted(
search_results, key=lambda x: x.get('position', float('inf')))
search_results, key=lambda x: x.get("position", float("inf"))
)

# Create a dictionary to store the final JSON result
json_result = {
"organic_results": search_results
}
json_result = {"organic_results": search_results}

return format_json_output(json_result), gateway

except (ConnectionError, ValueError) as exc:
error = {
"error": str(exc)
}
error = {"error": str(exc)}
return error, gateway
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = Novexity
version = 1.0.3
version = 1.0.4
author = NorkzYT
description = Freely Scrape Google Search Results Fast and Easy ✨
long_description = file: README.md
Expand Down

0 comments on commit 5dc4450

Please sign in to comment.