-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathscanner.py
executable file
·271 lines (229 loc) · 9.74 KB
/
scanner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2023 Technology Innovation Institute (TII)
# SPDX-License-Identifier: Apache-2.0
"""Summarize nixpkgs meta-attributes"""
import json
import pathlib
import re
from tempfile import NamedTemporaryFile
import pandas as pd
from common.utils import LOG, LOG_SPAM, df_from_csv_file, df_to_csv_file, exec_cmd
###############################################################################
class NixMetaScanner:
"""Scan nixpkgs meta-info"""
def __init__(self):
self.df_meta = None
def scan(self, nixref):
"""
Scan nixpkgs meta-info using nixpkgs version pinned in nixref;
nixref can be a nix store path or flakeref.
"""
nixpkgs_path = nixref_to_nixpkgs_path(nixref)
if not nixpkgs_path:
return
if not nixpkgs_path.exists():
LOG.warning("Nixpkgs not in nix store: %s", nixpkgs_path.as_posix())
return
LOG.debug("nixpkgs: %s", nixpkgs_path)
self._read_nixpkgs_meta(nixpkgs_path)
def to_csv(self, csv_path, append=False):
"""Export meta-info to a csv file"""
csv_path = pathlib.Path(csv_path)
if append and csv_path.exists():
df = df_from_csv_file(csv_path)
self.df_meta = pd.concat([self.df_meta, df], ignore_index=True)
self._drop_duplicates()
if self.df_meta is None or self.df_meta.empty:
LOG.info("Nothing to output")
return
csv_path.parent.mkdir(parents=True, exist_ok=True)
df_to_csv_file(self.df_meta, csv_path.absolute().as_posix())
def to_df(self):
"""Return meta-info as dataframe"""
return self.df_meta
def _read_nixpkgs_meta(self, nixpkgs_path):
prefix = "nixmeta_"
suffix = ".json"
with NamedTemporaryFile(delete=True, prefix=prefix, suffix=suffix) as f:
cmd = f"nix-env -qa --meta --json -f {nixpkgs_path.as_posix()}"
exec_cmd(cmd.split(), stdout=f)
LOG.debug("Generated meta.json: %s", f.name)
self.df_meta = _parse_json_metadata(f.name)
self._drop_duplicates()
def _drop_duplicates(self):
self.df_meta = self.df_meta.astype(str)
self.df_meta.fillna("", inplace=True)
uids = [
"name",
"version",
"meta_license_short",
"meta_license_spdxid",
"meta_homepage",
]
self.df_meta.sort_values(by=uids, inplace=True)
self.df_meta.drop_duplicates(subset=uids, keep="last", inplace=True)
###############################################################################
def nixref_to_nixpkgs_path(flakeref):
"""Return the store path of the nixpkgs pinned by flakeref"""
if not flakeref:
return None
LOG.debug("Finding meta-info for nixpkgs pinned in nixref: %s", flakeref)
# Strip possible target specifier from flakeref (i.e. everything after '#')
m_flakeref = re.match(r"([^#]+)#", flakeref)
if m_flakeref:
flakeref = m_flakeref.group(1)
LOG.debug("Stripped target specifier: %s", flakeref)
meta_json = _get_flake_metadata(flakeref)
if not _is_nixpkgs_metadata(meta_json):
# If flakeref is not nixpkgs flake, try finding the nixpkgs
# revision pinned by the given flakeref
LOG.debug("non-nixpkgs flakeref: %s", flakeref)
nixpkgs_flakeref = _get_nixpkgs_flakeref(meta_json)
if not nixpkgs_flakeref:
LOG.warning("Failed parsing locked nixpkgs: %s", flakeref)
return None
LOG.log(LOG_SPAM, "using nixpkgs_flakeref: %s", nixpkgs_flakeref)
meta_json = _get_flake_metadata(nixpkgs_flakeref)
if not _is_nixpkgs_metadata(meta_json):
LOG.warning("Failed reading nixpkgs metadata: %s", flakeref)
return None
return pathlib.Path(meta_json["path"]).absolute()
def _get_flake_metadata(flakeref):
"""
Return json object detailing the output of nix flake metadata
for given flakeref
"""
# Strip possible nixpkgs= prefix to support cases where flakeref is
# given the NIX_PATH environment variable
m_nixpkgs = re.match(r"nixpkgs=([^:\s]+)", flakeref)
if m_nixpkgs:
flakeref = m_nixpkgs.group(1)
# Read nix flake metadata as json
exp = "--extra-experimental-features flakes "
exp += "--extra-experimental-features nix-command"
cmd = f"nix flake metadata {flakeref} --json {exp}"
ret = exec_cmd(cmd.split(), raise_on_error=False, return_error=True)
if ret is None or ret.returncode != 0:
LOG.warning("Failed reading flake metadata: %s", flakeref)
return None
meta_json = json.loads(ret.stdout)
LOG.log(LOG_SPAM, meta_json)
return meta_json
def _is_nixpkgs_metadata(meta_json):
"""Return true if meta_json describes nixpkgs flakeref"""
try:
# Needed to support cases where flakeref is a nix store path
# to nixpkgs-source directory
if (
"path" in meta_json
and "description" in meta_json
and meta_json["description"]
== "A collection of packages for the Nix package manager"
):
return True
if (
"path" in meta_json
and meta_json["locked"]["owner"] == "NixOS"
and meta_json["locked"]["repo"] == "nixpkgs"
):
return True
except (KeyError, TypeError):
return False
return False
def _get_flake_nixpkgs_val(meta_json, key):
"""Given nixpkgs flake metadata, return the locked key"""
try:
return meta_json["locks"]["nodes"]["nixpkgs"]["locked"][key]
except (KeyError, TypeError):
return None
def _get_flake_nixpkgs_obj(meta_json):
"""Given nixpkgs flake metadata, return the locked nixpkgs object"""
try:
return meta_json["locks"]["nodes"]["nixpkgs"]["locked"]
except (KeyError, TypeError):
return None
def _get_nixpkgs_flakeref_github(meta_json):
owner = _get_flake_nixpkgs_val(meta_json, "owner")
repo = _get_flake_nixpkgs_val(meta_json, "repo")
rev = _get_flake_nixpkgs_val(meta_json, "rev")
if None in [owner, repo, rev]:
LOG.debug(
"owner, repo, or rev not found: %s", _get_flake_nixpkgs_obj(meta_json)
)
return None
return f"github:{owner}/{repo}?rev={rev}"
def _get_nixpkgs_flakeref_git(meta_json):
url = _get_flake_nixpkgs_val(meta_json, "url")
rev = _get_flake_nixpkgs_val(meta_json, "rev")
ref = _get_flake_nixpkgs_val(meta_json, "ref")
if None in [url, rev, ref]:
LOG.debug("url, rev, or ref not found: %s", _get_flake_nixpkgs_obj(meta_json))
return None
return f"git+{url}?ref={ref}&rev={rev}"
def _get_nixpkgs_flakeref_path(meta_json):
path = _get_flake_nixpkgs_val(meta_json, "path")
if None in [path]:
LOG.debug("path not found: %s", _get_flake_nixpkgs_obj(meta_json))
return None
return f"path:{path}"
def _get_nixpkgs_flakeref_tarball(meta_json):
url = _get_flake_nixpkgs_val(meta_json, "url")
if None in [url]:
LOG.debug("url not found: %s", _get_flake_nixpkgs_obj(meta_json))
return None
return f"{url}"
def _get_nixpkgs_flakeref(meta_json):
"""Given nixpkgs flake metadata, return the locked ref"""
_type = _get_flake_nixpkgs_val(meta_json, "type")
nixpkgs_flakeref = None
if _type == "github":
nixpkgs_flakeref = _get_nixpkgs_flakeref_github(meta_json)
elif _type == "git":
nixpkgs_flakeref = _get_nixpkgs_flakeref_git(meta_json)
elif _type == "path":
nixpkgs_flakeref = _get_nixpkgs_flakeref_path(meta_json)
elif _type == "tarball":
nixpkgs_flakeref = _get_nixpkgs_flakeref_tarball(meta_json)
else:
LOG.debug("Unsupported nixpkgs locked type: %s", _type)
return nixpkgs_flakeref
def _parse_meta_entry(meta, key):
"""Parse the given key from the metadata entry"""
items = []
if isinstance(meta, dict):
items.extend([_parse_meta_entry(meta.get(key, ""), key)])
elif isinstance(meta, list):
items.extend([_parse_meta_entry(x, key) for x in meta])
else:
return str(meta)
return ";".join(list(filter(None, items)))
def _parse_json_metadata(json_filename):
"""Parse package metadata from the specified json file"""
with open(json_filename, "r", encoding="utf-8") as inf:
LOG.debug('Loading meta-info from "%s"', json_filename)
json_dict = json.loads(inf.read())
dict_selected = {}
setcol = dict_selected.setdefault
for _, pkg in json_dict.items():
# generic package info
setcol("name", []).append(pkg.get("name", ""))
setcol("pname", []).append(pkg.get("pname", ""))
setcol("version", []).append(pkg.get("version", ""))
# meta
meta = pkg.get("meta", {})
homepage = _parse_meta_entry(meta, key="homepage")
setcol("meta_homepage", []).append(homepage)
setcol("meta_unfree", []).append(meta.get("unfree", ""))
setcol("meta_description", []).append(meta.get("description", ""))
# meta.license
meta_license = meta.get("license", {})
license_short = _parse_meta_entry(meta_license, key="shortName")
setcol("meta_license_short", []).append(license_short)
license_spdx = _parse_meta_entry(meta_license, key="spdxId")
setcol("meta_license_spdxid", []).append(license_spdx)
# meta.maintainers
meta_maintainers = meta.get("maintainers", {})
emails = _parse_meta_entry(meta_maintainers, key="email")
setcol("meta_maintainers_email", []).append(emails)
return pd.DataFrame(dict_selected).astype(str)
###############################################################################