Page Menu
Home
DevCentral
Search
Configure Global Search
Log In
Files
F1734120
bin/resolve-hash
No One
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Authored By
dereckson
Mar 5 2022, 23:22
2022-03-05 23:22:04 (UTC+0)
Size
6 KB
Referenced Files
None
Subscribers
None
bin/resolve-hash
View Options
#!/usr/bin/env python3
# -------------------------------------------------------------
# Resolve a hash
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Project: Nasqueron
# Description: Query various sources with a known hash
# like Phabricator, Gerrit or GitHub to offer
# hash information URL from a VCS hash.
# License: BSD-2-Clause
# -------------------------------------------------------------
import
json
import
os
import
sys
import
requests
import
yaml
# -------------------------------------------------------------
# Phabricator
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
call_phabricator_api
(
api_url
,
token
,
method
,
parameters
):
parameters
[
"api.token"
]
=
token
r
=
requests
.
post
(
api_url
+
method
,
data
=
parameters
)
if
r
.
status_code
!=
200
:
return
None
result
=
r
.
json
()[
"result"
]
if
result
is
None
:
return
None
return
result
[
"data"
]
def
query_phabricator_instance
(
api_url
,
token
,
commit_hash
):
result
=
call_phabricator_api
(
api_url
,
token
,
"diffusion.commit.search"
,
{
"constraints[identifiers][0]"
:
commit_hash
},
)
if
result
is
None
:
return
None
try
:
commit
=
result
[
0
]
except
IndexError
:
# Query works but didn't find anything
return
None
return
resolve_phabricator_commit_url
(
api_url
,
token
,
commit_hash
,
commit
[
"fields"
][
"repositoryPHID"
]
)
def
resolve_phabricator_commit_url
(
api_url
,
token
,
commit_hash
,
repository_phid
):
callsign
=
query_get_repository_callsign
(
api_url
,
token
,
repository_phid
)
return
api_url
.
replace
(
"api/"
,
""
)
+
callsign
+
commit_hash
def
query_get_repository_callsign
(
api_url
,
token
,
repository_phid
):
result
=
call_phabricator_api
(
api_url
,
token
,
"diffusion.repository.search"
,
{
"constraints[phids][0]"
:
repository_phid
},
)
return
"r"
+
result
[
0
][
"fields"
][
"callsign"
]
def
query_phabricator_instances
(
config_file
,
commit_hash
):
with
open
(
config_file
,
"r"
)
as
fd
:
instances
=
json
.
load
(
fd
)
for
api_url
,
args
in
instances
[
"hosts"
]
.
items
():
url
=
query_phabricator_instance
(
api_url
,
args
[
"token"
],
commit_hash
)
if
url
is
not
None
:
return
url
return
None
# -------------------------------------------------------------
# Gerrit
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
query_gerrit_instance
(
instance
,
commit_hash
):
url
=
instance
+
"changes/?q="
+
commit_hash
r
=
requests
.
get
(
url
)
if
r
.
status_code
!=
200
:
print
(
r
.
status_code
)
return
None
# We can't use r.json() as currently the API starts responses
# by an extra line ")]}'"
payload
=
r
.
text
.
strip
()
.
split
(
"
\n
"
)[
-
1
]
result
=
json
.
loads
(
payload
)
if
not
result
:
return
None
change
=
result
[
0
]
return
f
"
{
instance
}
c/
{
change
[
'project'
]
}
/+/
{
change
[
'_number'
]
}
"
def
query_gerrit_instances
(
instances
,
commit_hash
):
for
instance
in
instances
:
url
=
query_gerrit_instance
(
instance
,
commit_hash
)
if
url
is
not
None
:
return
url
return
None
# -------------------------------------------------------------
# GitHub
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
query_github_instance
(
instance
,
commit_hash
):
url
=
f
"
{
instance
}
/search/commits?q=hash:
{
commit_hash
}
"
r
=
requests
.
get
(
url
)
if
r
.
status_code
!=
200
:
return
None
commits
=
r
.
json
()[
"items"
]
if
not
commits
:
return
None
return
commits
[
0
][
"html_url"
]
# -------------------------------------------------------------
# GitLab
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
query_gitlab_instance
(
instance
,
token
,
commit_hash
):
url
=
f
"
{
instance
}
api/v4/search?scope=commits&search=
{
commit_hash
}
"
r
=
requests
.
get
(
url
,
headers
=
{
"PRIVATE-TOKEN"
:
token
})
if
r
.
status_code
!=
200
:
return
None
commits
=
r
.
json
()
if
not
commits
:
return
None
return
commits
[
0
][
"web_url"
]
# -------------------------------------------------------------
# Configuration
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
parse_config
():
try
:
return
yaml
.
safe_load
(
open
(
os
.
environ
[
"HOME"
]
+
"/.config/resolvehash.conf"
))
except
:
return
{}
# -------------------------------------------------------------
# Hash search wrapper
#
# I. VCS :: code review and repositories hosting
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
class
VcsHashSearch
:
def
__init__
(
self
,
config
,
needle_hash
):
self
.
config
=
config
self
.
hash
=
needle_hash
def
search_phabricator
(
self
):
arrc_path
=
os
.
environ
[
"HOME"
]
+
"/.arcrc"
if
os
.
path
.
exists
(
arrc_path
):
return
query_phabricator_instances
(
arrc_path
,
self
.
hash
)
def
search_gerrit
(
self
):
if
"gerrit"
in
self
.
config
:
return
query_gerrit_instances
(
self
.
config
[
"gerrit"
],
self
.
hash
)
def
search_github
(
self
):
return
query_github_instance
(
"https://api.github.com"
,
self
.
hash
)
def
search_gitlab
(
self
):
if
"gitlab_public_token"
in
self
.
config
:
return
query_gitlab_instance
(
"https://gitlab.com/"
,
self
.
config
[
"gitlab_public_token"
],
self
.
hash
)
def
get_search_methods
(
self
):
return
[
# Strategy A. Code review systems we can autodiscover
self
.
search_phabricator
,
# Strategy B. Sources explicitly configured in configuration
self
.
search_gerrit
,
# Strategy C. Popular public hosting sites like GitHub
self
.
search_github
,
self
.
search_gitlab
,
]
def
search
(
self
):
for
method
in
self
.
get_search_methods
():
result
=
method
()
if
result
:
return
result
def
get_search_classes
():
return
[
VcsHashSearch
,
]
def
find_hash
(
config
,
needle_hash
):
for
search_class
in
get_search_classes
():
result
=
search_class
(
config
,
needle_hash
)
.
search
()
if
result
is
not
None
:
return
result
# -------------------------------------------------------------
# Application entry point
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def
run
(
needle_hash
):
result
=
find_hash
(
parse_config
(),
needle_hash
)
if
not
result
:
sys
.
exit
(
1
)
print
(
result
)
if
__name__
==
"__main__"
:
argc
=
len
(
sys
.
argv
)
if
argc
<
2
:
print
(
f
"Usage:
{
sys
.
argv
[
0
]
}
<hash>"
,
file
=
sys
.
stderr
)
sys
.
exit
(
1
)
run
(
sys
.
argv
[
1
])
File Metadata
Details
Attached
Mime Type
text/plain; charset=utf-8
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
960425
Default Alt Text
bin/resolve-hash (6 KB)
Attached To
Mode
P301 bin/resolve-hash
Attached
Detach File
Event Timeline
Log In to Comment