#!/usr/bin/env python3
"""Collection of package support utilities."""
# Core Library modules
import json
import os
import pickle
import re
from importlib.resources import as_file
from pathlib import Path
from typing import Any, Union
# Third party modules
import requests
from colorama import Back, Fore, Style
from packaging import version
from tqdm import tqdm
# First party modules
import pynamer
# Local modules
from . import logger, project_count_file_trv, pypi_index_file_trv
from .config import config
from .exceptions import file_exception, request_exception
[docs]
def check_integrity() -> None:
pass # pragma: no cover
[docs]
def reset() -> None:
pass # pragma: no cover
[docs]
def feedback(message: str, feedback_type: str) -> None:
"""Generates a formatted messages appropriate to the message type.
Args:
message: text to be echoed.
feedback_type: identifies type of message to display.
"""
if feedback_type not in ["null", "nominal", "warning", "error"]:
return
if config.idlemode == 1:
print(message)
else:
if feedback_type == "null":
print(Fore.WHITE + Style.BRIGHT + f"{message}" + Style.RESET_ALL)
elif feedback_type == "nominal":
print(Fore.GREEN + Style.BRIGHT + f"{message}" + Style.RESET_ALL)
elif feedback_type == "warning":
print(
Fore.YELLOW + Back.BLACK + Style.BRIGHT + f"{message}" + Style.RESET_ALL
)
elif feedback_type == "error":
print(
Fore.RED
+ Back.BLACK
+ Style.BRIGHT
+ f"ERROR: {message}"
+ Style.RESET_ALL
)
[docs]
def search_json(json_data: dict, project_name: str) -> str:
"""Searches a json data structure for a GitHub project URL.
The json data is found from the PyPI json URL: "https://pypi.org/pypi/package_name".
The function searches for the GitHub homepage URL:
"https://github.com/{owner}/{package_name} and returns upon first match.
Args:
json_data: json found from PyPI.
project_name: the package name under test.
Returns:
str: the GitHub homepage URL if found else an empty string.
"""
homepage = ""
pattern = re.compile(r"https?://github.com/[\w\-/]+")
def check_value(value: dict) -> None:
if isinstance(value, str):
match = pattern.search(value)
if match and value.endswith(project_name):
nonlocal homepage
homepage = match.group(0)
return
elif isinstance(value, list):
for item in value:
check_value(item)
elif isinstance(value, dict):
for val in value.values():
check_value(val)
check_value(json_data)
return homepage
[docs]
def find_pypirc_file(filename: str = ".pypirc") -> None:
"""Function to iterate over paths in the PATH environment variable to find a file.
Designed to find a .pypirc file starting with the current working directory.
If identified will update the config.pypirc variable, so it can be used elsewhere.
Args:
filename: filename to find.
"""
system_path = os.getenv("PATH")
if system_path is not None:
path_directories = [os.getcwd()]
path_directories.extend(system_path.split(os.pathsep))
for directory in path_directories:
file_path = Path(directory) / filename
if file_path.exists():
logger.debug(
"%s is present in the system's PATH at %s", filename, directory
)
config.pypirc = file_path
break
logger.debug("%s is not present in the system's PATH.", filename)
@request_exception
def generate_pypi_index() -> None:
"""Generates a list of projects in PyPI's simple index - writes results to a file.
Raises:
SystemExit: if any requests.RequestException occurs.
Notes:
A potentially expensive operation as there are almost 500,000 projects to
process. Can take 2-3 seconds. Look to improve performance at a later date:
look at asyncio, asyncio.http etc.
An improvement is to automatically periodically run this in the background.
"""
new_count = 0
pattern = re.compile(r">([\w\W]*?)<")
with as_file(pypi_index_file_trv) as pypi_index_file:
if pypi_index_file.exists():
pypi_index_file.unlink(missing_ok=True)
progress_bar = tqdm(total=config.project_count)
index_object_raw = requests.get(config.pypi_simple_index_url, timeout=5)
with pypi_index_file_trv.open("a") as file: # type: ignore
for line in index_object_raw.iter_lines():
line = str(line)
project_text = re.search(pattern, line)
if project_text is not None:
new_count += 1
progress_bar.update(1)
project = "".join([project_text.group(1), " \n"])
file.write(project)
progress_bar.close()
with (
as_file(project_count_file_trv) as project_count_file,
project_count_file.open("wb") as f,
):
pickle.dump(new_count, f) # type: ignore[arg-type]
if config.project_count > 0:
diff = new_count - config.project_count
if diff > 0: # pragma: no cover
feedback(
f"Project count has increased by {diff} since last index generation",
"warning",
)
elif diff < 0: # pragma: no cover
feedback(
f"Project count has decreased by {diff} since last index generation",
"warning",
)
@request_exception
def check_version() -> None:
"""Utility function to compare package version against latest version on PyPI.
Returns:
current_version: version of the installed package.
str: message concerning the result of the comparison.
bool: True: if the installed package is up-to-date.
False: if there is a newer version on PyPI.
Raises:
SystemExit: if any requests.RequestException occurs.
"""
url_json = "".join([config.pypi_json_url, "pynamer", "/json"])
current_version = version.parse(pynamer.__version__)
project_json_raw = requests.get(url_json, timeout=5)
if project_json_raw.status_code == 200:
project_json = json.loads(project_json_raw.content)
pypi_version = version.parse(project_json["info"]["version"])
if pypi_version > current_version:
message = f"(There is a newer version available: {pypi_version})"
feedback(f"{current_version} : {message}", "warning")
elif pypi_version == current_version:
message = "(You have the most recent version)"
feedback(f"{current_version} : {message}", "nominal")
@file_exception
def process_input_file(file: str) -> list[Union[str, Any]]:
"""Processes the contents of the file to a list of strings.
Args:
file: simple string for the file.
Raises:
SystemExit: if there is an error opening the file.
Notes:
File contents should contain any number of space separated strings on any
number of lines.
"""
file_path = Path(file)
with file_path.open(mode="r") as f:
file_contents = f.read()
projects = file_contents.split()
return list(set(projects))
@file_exception
def write_output_file(file_name: str, results: dict) -> None:
"""Write the results to a file.
Args:
file_name: name of file to save as a simple string.
results: dictionary containing the test results e.g.
{"pynball": [1, 1, 1]}
Raises:
SystemExit: if there is an error opening the file.
"""
header_width = 83
truncation_width = 25
file_path = Path(file_name)
title = "Results from pynamer PyPI utility\n"
title = "".join([title, "=" * header_width, "\n\n"])
title = "".join(
[
title,
"Test 1 - Basic url lookup on PyPI\n",
"Test 2 - Search of PyPIs simple index\n",
"Test 3 - Search using an request to PyPIs search 'API'\n\n",
]
)
header = f"{'Project':30}{'Test1':12}{'Test2':12}{'Test3':12}{'Conclusion'}\n"
header = "".join([header, "=" * header_width, "\n"])
projects_results: str = ""
for project in results:
project_name = (
project
if len(project) <= truncation_width
else project[: truncation_width - 3] + "..."
)
projects_results = "".join([projects_results, f"{project_name:30}"])
for test in results[project]:
test = "Found" if test == 1 else "Not Found"
projects_results = "".join([projects_results, f"{test:12}"])
conclusion = "Not Available" if sum(results[project]) > 0 else "Available"
projects_results = "".join([projects_results, f"{conclusion}"])
projects_results = "".join([projects_results, "\n", "-" * header_width, "\n"])
final_output_text = "".join([title, header, projects_results])
with file_path.open(mode="w") as f:
f.write(final_output_text)