aboutsummaryrefslogtreecommitdiff
path: root/tools/binman/bintool.py
blob: ec30ceff74c0e8c618c29f4a52eae60d01139ce3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
# SPDX-License-Identifier: GPL-2.0+
# Copyright 2022 Google LLC
# Copyright (C) 2022 Weidmüller Interface GmbH & Co. KG
# Stefan Herbrechtsmeier <stefan.herbrechtsmeier@weidmueller.com>
#
"""Base class for all bintools

This defines the common functionality for all bintools, including running
the tool, checking its version and fetching it if needed.
"""

import collections
import glob
import importlib
import multiprocessing
import os
import shutil
import tempfile
import urllib.error

from patman import command
from patman import terminal
from patman import tools
from patman import tout

BINMAN_DIR = os.path.dirname(os.path.realpath(__file__))

# Format string for listing bintools, see also the header in list_all()
FORMAT = '%-16.16s %-12.12s %-26.26s %s'

# List of known modules, to avoid importing the module multiple times
modules = {}

# Possible ways of fetching a tool (FETCH_COUNT is number of ways)
FETCH_ANY, FETCH_BIN, FETCH_BUILD, FETCH_COUNT = range(4)

FETCH_NAMES = {
    FETCH_ANY: 'any method',
    FETCH_BIN: 'binary download',
    FETCH_BUILD: 'build from source'
    }

# Status of tool fetching
FETCHED, FAIL, PRESENT, STATUS_COUNT = range(4)

DOWNLOAD_DESTDIR = os.path.join(os.getenv('HOME'), 'bin')

class Bintool:
    """Tool which operates on binaries to help produce entry contents

    This is the base class for all bintools
    """
    # List of bintools to regard as missing
    missing_list = []

    def __init__(self, name, desc):
        self.name = name
        self.desc = desc

    @staticmethod
    def find_bintool_class(btype):
        """Look up the bintool class for bintool

        Args:
            byte: Bintool to use, e.g. 'mkimage'

        Returns:
            The bintool class object if found, else a tuple:
                module name that could not be found
                exception received
        """
        # Convert something like 'u-boot' to 'u_boot' since we are only
        # interested in the type.
        module_name = btype.replace('-', '_')
        module = modules.get(module_name)
        class_name = f'Bintool{module_name}'

        # Import the module if we have not already done so
        if not module:
            try:
                module = importlib.import_module('binman.btool.' + module_name)
            except ImportError as exc:
                try:
                    # Deal with classes which must be renamed due to conflicts
                    # with Python libraries
                    class_name = f'Bintoolbtool_{module_name}'
                    module = importlib.import_module('binman.btool.btool_' +
                                                     module_name)
                except ImportError:
                    return module_name, exc
            modules[module_name] = module

        # Look up the expected class name
        return getattr(module, class_name)

    @staticmethod
    def create(name):
        """Create a new bintool object

        Args:
            name (str): Bintool to create, e.g. 'mkimage'

        Returns:
            A new object of the correct type (a subclass of Binutil)
        """
        cls = Bintool.find_bintool_class(name)
        if isinstance(cls, tuple):
            raise ValueError("Cannot import bintool module '%s': %s" % cls)

        # Call its constructor to get the object we want.
        obj = cls(name)
        return obj

    def show(self):
        """Show a line of information about a bintool"""
        if self.is_present():
            version = self.version()
        else:
            version = '-'
        print(FORMAT % (self.name, version, self.desc,
                        self.get_path() or '(not found)'))

    @classmethod
    def set_missing_list(cls, missing_list):
        cls.missing_list = missing_list or []

    @staticmethod
    def get_tool_list(include_testing=False):
        """Get a list of the known tools

        Returns:
            list of str: names of all tools known to binman
        """
        files = glob.glob(os.path.join(BINMAN_DIR, 'btool/*'))
        names = [os.path.splitext(os.path.basename(fname))[0]
                 for fname in files]
        names = [name for name in names if name[0] != '_']
        if include_testing:
            names.append('_testing')
        return sorted(names)

    @staticmethod
    def list_all():
        """List all the bintools known to binman"""
        names = Bintool.get_tool_list()
        print(FORMAT % ('Name', 'Version', 'Description', 'Path'))
        print(FORMAT % ('-' * 15,'-' * 11, '-' * 25, '-' * 30))
        for name in names:
            btool = Bintool.create(name)
            btool.show()

    def is_present(self):
        """Check if a bintool is available on the system

        Returns:
            bool: True if available, False if not
        """
        if self.name in self.missing_list:
            return False
        return bool(self.get_path())

    def get_path(self):
        """Get the path of a bintool

        Returns:
            str: Path to the tool, if available, else None
        """
        return tools.tool_find(self.name)

    def fetch_tool(self, method, col, skip_present):
        """Fetch a single tool

        Args:
            method (FETCH_...): Method to use
            col (terminal.Color): Color terminal object
            skip_present (boo;): Skip fetching if it is already present

        Returns:
            int: Result of fetch either FETCHED, FAIL, PRESENT
        """
        def try_fetch(meth):
            res = None
            try:
                res = self.fetch(meth)
            except urllib.error.URLError as uerr:
                message = uerr.reason
                print(col.build(col.RED, f'- {message}'))

            except ValueError as exc:
                print(f'Exception: {exc}')
            return res

        if skip_present and self.is_present():
            return PRESENT
        print(col.build(col.YELLOW, 'Fetch: %s' % self.name))
        if method == FETCH_ANY:
            for try_method in range(1, FETCH_COUNT):
                print(f'- trying method: {FETCH_NAMES[try_method]}')
                result = try_fetch(try_method)
                if result:
                    break
        else:
            result = try_fetch(method)
        if not result:
            return FAIL
        if result is not True:
            fname, tmpdir = result
            dest = os.path.join(DOWNLOAD_DESTDIR, self.name)
            print(f"- writing to '{dest}'")
            shutil.move(fname, dest)
            if tmpdir:
                shutil.rmtree(tmpdir)
        return FETCHED

    @staticmethod
    def fetch_tools(method, names_to_fetch):
        """Fetch bintools from a suitable place

        This fetches or builds the requested bintools so that they can be used
        by binman

        Args:
            names_to_fetch (list of str): names of bintools to fetch

        Returns:
            True on success, False on failure
        """
        def show_status(color, prompt, names):
            print(col.build(
                color, f'{prompt}:%s{len(names):2}: %s' %
                (' ' * (16 - len(prompt)), ' '.join(names))))

        col = terminal.Color()
        skip_present = False
        name_list = names_to_fetch
        if len(names_to_fetch) == 1 and names_to_fetch[0] in ['all', 'missing']:
            name_list = Bintool.get_tool_list()
            if names_to_fetch[0] == 'missing':
                skip_present = True
            print(col.build(col.YELLOW,
                            'Fetching tools:      %s' % ' '.join(name_list)))
        status = collections.defaultdict(list)
        for name in name_list:
            btool = Bintool.create(name)
            result = btool.fetch_tool(method, col, skip_present)
            status[result].append(name)
            if result == FAIL:
                if method == FETCH_ANY:
                    print('- failed to fetch with all methods')
                else:
                    print(f"- method '{FETCH_NAMES[method]}' is not supported")

        if len(name_list) > 1:
            if skip_present:
                show_status(col.GREEN, 'Already present', status[PRESENT])
            show_status(col.GREEN, 'Tools fetched', status[FETCHED])
            if status[FAIL]:
                show_status(col.RED, 'Failures', status[FAIL])
        return not status[FAIL]

    def run_cmd_result(self, *args, binary=False, raise_on_error=True):
        """Run the bintool using command-line arguments

        Args:
            args (list of str): Arguments to provide, in addition to the bintool
                name
            binary (bool): True to return output as bytes instead of str
            raise_on_error (bool): True to raise a ValueError exception if the
                tool returns a non-zero return code

        Returns:
            CommandResult: Resulting output from the bintool, or None if the
                tool is not present
        """
        if self.name in self.missing_list:
            return None
        name = os.path.expanduser(self.name)  # Expand paths containing ~
        all_args = (name,) + args
        env = tools.get_env_with_path()
        tout.detail(f"bintool: {' '.join(all_args)}")
        result = command.run_pipe(
            [all_args], capture=True, capture_stderr=True, env=env,
            raise_on_error=False, binary=binary)

        if result.return_code:
            # Return None if the tool was not found. In this case there is no
            # output from the tool and it does not appear on the path. We still
            # try to run it (as above) since RunPipe() allows faking the tool's
            # output
            if not any([result.stdout, result.stderr, tools.tool_find(name)]):
                tout.info(f"bintool '{name}' not found")
                return None
            if raise_on_error:
                tout.info(f"bintool '{name}' failed")
                raise ValueError("Error %d running '%s': %s" %
                                (result.return_code, ' '.join(all_args),
                                result.stderr or result.stdout))
        if result.stdout:
            tout.debug(result.stdout)
        if result.stderr:
            tout.debug(result.stderr)
        return result

    def run_cmd(self, *args, binary=False):
        """Run the bintool using command-line arguments

        Args:
            args (list of str): Arguments to provide, in addition to the bintool
                name
            binary (bool): True to return output as bytes instead of str

        Returns:
            str or bytes: Resulting stdout from the bintool
        """
        result = self.run_cmd_result(*args, binary=binary)
        if result:
            return result.stdout

    @classmethod
    def build_from_git(cls, git_repo, make_target, bintool_path):
        """Build a bintool from a git repo

        This clones the repo in a temporary directory, builds it with 'make',
        then returns the filename of the resulting executable bintool

        Args:
            git_repo (str): URL of git repo
            make_target (str): Target to pass to 'make' to build the tool
            bintool_path (str): Relative path of the tool in the repo, after
                build is complete

        Returns:
            tuple:
                str: Filename of fetched file to copy to a suitable directory
                str: Name of temp directory to remove, or None
            or None on error
        """
        tmpdir = tempfile.mkdtemp(prefix='binmanf.')
        print(f"- clone git repo '{git_repo}' to '{tmpdir}'")
        tools.run('git', 'clone', '--depth', '1', git_repo, tmpdir)
        print(f"- build target '{make_target}'")
        tools.run('make', '-C', tmpdir, '-j', f'{multiprocessing.cpu_count()}',
                  make_target)
        fname = os.path.join(tmpdir, bintool_path)
        if not os.path.exists(fname):
            print(f"- File '{fname}' was not produced")
            return None
        return fname, tmpdir

    @classmethod
    def fetch_from_url(cls, url):
        """Fetch a bintool from a URL

        Args:
            url (str): URL to fetch from

        Returns:
            tuple:
                str: Filename of fetched file to copy to a suitable directory
                str: Name of temp directory to remove, or None
        """
        fname, tmpdir = tools.download(url)
        tools.run('chmod', 'a+x', fname)
        return fname, tmpdir

    @classmethod
    def fetch_from_drive(cls, drive_id):
        """Fetch a bintool from Google drive

        Args:
            drive_id (str): ID of file to fetch. For a URL of the form
            'https://drive.google.com/file/d/xxx/view?usp=sharing' the value
            passed here should be 'xxx'

        Returns:
            tuple:
                str: Filename of fetched file to copy to a suitable directory
                str: Name of temp directory to remove, or None
        """
        url = f'https://drive.google.com/uc?export=download&id={drive_id}'
        return cls.fetch_from_url(url)

    @classmethod
    def apt_install(cls, package):
        """Install a bintool using the 'aot' tool

        This requires use of servo so may request a password

        Args:
            package (str): Name of package to install

        Returns:
            True, assuming it completes without error
        """
        args = ['sudo', 'apt', 'install', '-y', package]
        print('- %s' % ' '.join(args))
        tools.run(*args)
        return True

    @staticmethod
    def WriteDocs(modules, test_missing=None):
        """Write out documentation about the various bintools to stdout

        Args:
            modules: List of modules to include
            test_missing: Used for testing. This is a module to report
                as missing
        """
        print('''.. SPDX-License-Identifier: GPL-2.0+

Binman bintool Documentation
============================

This file describes the bintools (binary tools) supported by binman. Bintools
are binman's name for external executables that it runs to generate or process
binaries. It is fairly easy to create new bintools. Just add a new file to the
'btool' directory. You can use existing bintools as examples.


''')
        modules = sorted(modules)
        missing = []
        for name in modules:
            module = Bintool.find_bintool_class(name)
            docs = getattr(module, '__doc__')
            if test_missing == name:
                docs = None
            if docs:
                lines = docs.splitlines()
                first_line = lines[0]
                rest = [line[4:] for line in lines[1:]]
                hdr = 'Bintool: %s: %s' % (name, first_line)
                print(hdr)
                print('-' * len(hdr))
                print('\n'.join(rest))
                print()
                print()
            else:
                missing.append(name)

        if missing:
            raise ValueError('Documentation is missing for modules: %s' %
                             ', '.join(missing))

    # pylint: disable=W0613
    def fetch(self, method):
        """Fetch handler for a bintool

        This should be implemented by the base class

        Args:
            method (FETCH_...): Method to use

        Returns:
            tuple:
                str: Filename of fetched file to copy to a suitable directory
                str: Name of temp directory to remove, or None
            or True if the file was fetched and already installed
            or None if no fetch() implementation is available

        Raises:
            Valuerror: Fetching could not be completed
        """
        print(f"No method to fetch bintool '{self.name}'")
        return False

    # pylint: disable=R0201
    def version(self):
        """Version handler for a bintool

        This should be implemented by the base class

        Returns:
            str: Version string for this bintool
        """
        return 'unknown'

class BintoolPacker(Bintool):
    """Tool which compression / decompression entry contents

    This is a bintools base class for compression / decompression packer

    Properties:
        name: Name of packer tool
        compression: Compression type (COMPRESS_...), value of 'name' property
            if none
        compress_args: List of positional args provided to tool for compress,
            ['--compress'] if none
        decompress_args: List of positional args provided to tool for
            decompress, ['--decompress'] if none
        fetch_package: Name of the tool installed using the apt, value of 'name'
            property if none
        version_regex: Regular expressions to extract the version from tool
            version output,  '(v[0-9.]+)' if none
    """
    def __init__(self, name, compression=None, compress_args=None,
                 decompress_args=None, fetch_package=None,
                 version_regex=r'(v[0-9.]+)'):
        desc = '%s compression' % (compression if compression else name)
        super().__init__(name, desc)
        if compress_args is None:
            compress_args = ['--compress']
        self.compress_args = compress_args
        if decompress_args is None:
            decompress_args = ['--decompress']
        self.decompress_args = decompress_args
        if fetch_package is None:
            fetch_package = name
        self.fetch_package = fetch_package
        self.version_regex = version_regex

    def compress(self, indata):
        """Compress data

        Args:
            indata (bytes): Data to compress

        Returns:
            bytes: Compressed data
        """
        with tempfile.NamedTemporaryFile(prefix='comp.tmp',
                                         dir=tools.get_output_dir()) as tmp:
            tools.write_file(tmp.name, indata)
            args = self.compress_args + ['--stdout', tmp.name]
            return self.run_cmd(*args, binary=True)

    def decompress(self, indata):
        """Decompress data

        Args:
            indata (bytes): Data to decompress

        Returns:
            bytes: Decompressed data
        """
        with tempfile.NamedTemporaryFile(prefix='decomp.tmp',
                                         dir=tools.get_output_dir()) as inf:
            tools.write_file(inf.name, indata)
            args = self.decompress_args + ['--stdout', inf.name]
            return self.run_cmd(*args, binary=True)

    def fetch(self, method):
        """Fetch handler

        This installs the gzip package using the apt utility.

        Args:
            method (FETCH_...): Method to use

        Returns:
            True if the file was fetched and now installed, None if a method
            other than FETCH_BIN was requested

        Raises:
            Valuerror: Fetching could not be completed
        """
        if method != FETCH_BIN:
            return None
        return self.apt_install(self.fetch_package)

    def version(self):
        """Version handler

        Returns:
            str: Version number
        """
        import re

        result = self.run_cmd_result('-V')
        out = result.stdout.strip()
        if not out:
            out = result.stderr.strip()
        if not out:
            return super().version()

        m_version = re.search(self.version_regex, out)
        return m_version.group(1) if m_version else out