Page MenuHomeDevCentral

bin/resolve-hash

Authored By
dereckson
Mar 5 2022, 23:22
Size
6 KB
Referenced Files
None
Subscribers
None

bin/resolve-hash

#!/usr/bin/env python3
# -------------------------------------------------------------
# Resolve a hash
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Project: Nasqueron
# Description: Query various sources with a known hash
# like Phabricator, Gerrit or GitHub to offer
# hash information URL from a VCS hash.
# License: BSD-2-Clause
# -------------------------------------------------------------
import json
import os
import sys
import requests
import yaml
# -------------------------------------------------------------
# Phabricator
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def call_phabricator_api(api_url, token, method, parameters):
parameters["api.token"] = token
r = requests.post(api_url + method, data=parameters)
if r.status_code != 200:
return None
result = r.json()["result"]
if result is None:
return None
return result["data"]
def query_phabricator_instance(api_url, token, commit_hash):
result = call_phabricator_api(
api_url,
token,
"diffusion.commit.search",
{"constraints[identifiers][0]": commit_hash},
)
if result is None:
return None
try:
commit = result[0]
except IndexError:
# Query works but didn't find anything
return None
return resolve_phabricator_commit_url(
api_url, token, commit_hash, commit["fields"]["repositoryPHID"]
)
def resolve_phabricator_commit_url(api_url, token, commit_hash, repository_phid):
callsign = query_get_repository_callsign(api_url, token, repository_phid)
return api_url.replace("api/", "") + callsign + commit_hash
def query_get_repository_callsign(api_url, token, repository_phid):
result = call_phabricator_api(
api_url,
token,
"diffusion.repository.search",
{"constraints[phids][0]": repository_phid},
)
return "r" + result[0]["fields"]["callsign"]
def query_phabricator_instances(config_file, commit_hash):
with open(config_file, "r") as fd:
instances = json.load(fd)
for api_url, args in instances["hosts"].items():
url = query_phabricator_instance(api_url, args["token"], commit_hash)
if url is not None:
return url
return None
# -------------------------------------------------------------
# Gerrit
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def query_gerrit_instance(instance, commit_hash):
url = instance + "changes/?q=" + commit_hash
r = requests.get(url)
if r.status_code != 200:
print(r.status_code)
return None
# We can't use r.json() as currently the API starts responses
# by an extra line ")]}'"
payload = r.text.strip().split("\n")[-1]
result = json.loads(payload)
if not result:
return None
change = result[0]
return f"{instance}c/{change['project']}/+/{change['_number']}"
def query_gerrit_instances(instances, commit_hash):
for instance in instances:
url = query_gerrit_instance(instance, commit_hash)
if url is not None:
return url
return None
# -------------------------------------------------------------
# GitHub
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def query_github_instance(instance, commit_hash):
url = f"{instance}/search/commits?q=hash:{commit_hash}"
r = requests.get(url)
if r.status_code != 200:
return None
commits = r.json()["items"]
if not commits:
return None
return commits[0]["html_url"]
# -------------------------------------------------------------
# GitLab
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def query_gitlab_instance(instance, token, commit_hash):
url = f"{instance}api/v4/search?scope=commits&search={commit_hash}"
r = requests.get(url, headers={"PRIVATE-TOKEN": token})
if r.status_code != 200:
return None
commits = r.json()
if not commits:
return None
return commits[0]["web_url"]
# -------------------------------------------------------------
# Configuration
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def parse_config():
try:
return yaml.safe_load(open(os.environ["HOME"] + "/.config/resolvehash.conf"))
except:
return {}
# -------------------------------------------------------------
# Hash search wrapper
#
# I. VCS :: code review and repositories hosting
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
class VcsHashSearch:
def __init__(self, config, needle_hash):
self.config = config
self.hash = needle_hash
def search_phabricator(self):
arrc_path = os.environ["HOME"] + "/.arcrc"
if os.path.exists(arrc_path):
return query_phabricator_instances(arrc_path, self.hash)
def search_gerrit(self):
if "gerrit" in self.config:
return query_gerrit_instances(self.config["gerrit"], self.hash)
def search_github(self):
return query_github_instance("https://api.github.com", self.hash)
def search_gitlab(self):
if "gitlab_public_token" in self.config:
return query_gitlab_instance(
"https://gitlab.com/", self.config["gitlab_public_token"], self.hash
)
def get_search_methods(self):
return [
# Strategy A. Code review systems we can autodiscover
self.search_phabricator,
# Strategy B. Sources explicitly configured in configuration
self.search_gerrit,
# Strategy C. Popular public hosting sites like GitHub
self.search_github,
self.search_gitlab,
]
def search(self):
for method in self.get_search_methods():
result = method()
if result:
return result
def get_search_classes():
return [
VcsHashSearch,
]
def find_hash(config, needle_hash):
for search_class in get_search_classes():
result = search_class(config, needle_hash).search()
if result is not None:
return result
# -------------------------------------------------------------
# Application entry point
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def run(needle_hash):
result = find_hash(parse_config(), needle_hash)
if not result:
sys.exit(1)
print(result)
if __name__ == "__main__":
argc = len(sys.argv)
if argc < 2:
print(f"Usage: {sys.argv[0]} <hash>", file=sys.stderr)
sys.exit(1)
run(sys.argv[1])

File Metadata

Mime Type
text/plain; charset=utf-8
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
960425
Default Alt Text
bin/resolve-hash (6 KB)

Event Timeline