diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..c72ae90 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,44 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +bw-menu is a rofi/fzf frontend for Bitwarden via the `rbw` CLI. It lets users interactively search vault entries, select one, and copy a field (password, username, or TOTP) to the clipboard. It maintains a selection history so recent entries appear first. + +## Commands + +```bash +uv sync # install dependencies and set up virtualenv +bw-menu select # run with rofi (default) +bw-menu select --selector fzf # run with fzf +``` + +There are no tests or linting configured. + +## Architecture + +**Entry point:** `bw-menu` console script → `src/bw_menu/__main__.py:main()` + +**Two subcommands:** +- `select` — interactive vault entry selection via rofi or fzf +- `history` — list/get from recently selected entries + +**Key data flow for `select`:** +1. `cli.py` parses args → `__main__.py` creates a selector via `selector.create_selector()` +2. Selector fetches entries from `rbw.list_entries()` and history from `history.load()` +3. History entries appear first (marked distinctly per selector), then remaining entries sorted by folder/name +4. On selection, `rbw.get_field(entry, field)` retrieves the secret, entry is added to history +5. Result is printed or copied via `clipboard.copy()` + +**Rofi script mode:** `RofiSelector` uses rofi's script mode protocol — rofi invokes `bw-menu select` repeatedly. `ROFI_RETV` env var distinguishes initial list generation (0) from selection handling (1). Entry identity survives the round-trip via `ROFI_INFO` using ASCII record separator (`\x1e`) encoding. + +**External tools (called via subprocess):** `rbw` (vault access), `rofi`/`fzf` (selection UI), `wl-copy`/`xclip` (clipboard). + +**Python dependency:** `pyyaml` (for config parsing). All external tool interaction is via subprocess calls. + +## Build System + +- `uv` for environment/package management +- `hatchling` as build backend +- Source layout: `src/bw_menu/` diff --git a/src/bitwarden.py b/src/bitwarden.py deleted file mode 100644 index 7ac35f6..0000000 --- a/src/bitwarden.py +++ /dev/null @@ -1,443 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -# Copyright: (c) 2022, Linuxfabrik, Zurich, Switzerland, https://www.linuxfabrik.ch -# The Unlicense (see LICENSE or https://unlicense.org/) - -from __future__ import absolute_import, division, print_function - -import email.encoders -import email.mime.application -import email.mime.multipart -import email.mime.nonmultipart -import email.parser -import email.utils -import json -import mimetypes -import os -import urllib.parse -from urllib.error import HTTPError, URLError - -from ansible.module_utils.common.collections import Mapping -from ansible.module_utils.six import PY2, PY3, string_types -from ansible.module_utils.six.moves import cStringIO - -try: - import email.policy -except ImportError: - # Py2 - import email.generator - -from ansible.module_utils._text import to_bytes, to_native, to_text -from ansible.module_utils.urls import (ConnectionError, SSLValidationError, - open_url) - - -def prepare_multipart_no_base64(fields): - """Taken from ansible.module_utils.urls, but adjusted to not no encoding as the bitwarden API does not work with that - (even though it should according to the RFC, Content-Transfer-Encoding is deprecated but not removed). - See https://github.com/ansible/ansible/issues/73621 - - Takes a mapping, and prepares a multipart/form-data body - - :arg fields: Mapping - :returns: tuple of (content_type, body) where ``content_type`` is - the ``multipart/form-data`` ``Content-Type`` header including - ``boundary`` and ``body`` is the prepared bytestring body - - Payload content from a file will be base64 encoded and will include - the appropriate ``Content-Transfer-Encoding`` and ``Content-Type`` - headers. - - Example: - { - "file1": { - "filename": "/bin/true", - "mime_type": "application/octet-stream" - }, - "file2": { - "content": "text based file content", - "filename": "fake.txt", - "mime_type": "text/plain", - }, - "text_form_field": "value" - } - """ - - if not isinstance(fields, Mapping): - raise TypeError( - 'Mapping is required, cannot be type %s' % fields.__class__.__name__ - ) - - m = email.mime.multipart.MIMEMultipart('form-data') - for field, value in sorted(fields.items()): - if isinstance(value, string_types): - main_type = 'text' - sub_type = 'plain' - content = value - filename = None - elif isinstance(value, Mapping): - filename = value.get('filename') - content = value.get('content') - if not any((filename, content)): - raise ValueError('at least one of filename or content must be provided') - - mime = value.get('mime_type') - if not mime: - try: - mime = mimetypes.guess_type(filename or '', strict=False)[0] or 'application/octet-stream' - except Exception: - mime = 'application/octet-stream' - main_type, sep, sub_type = mime.partition('/') - else: - raise TypeError( - 'value must be a string, or mapping, cannot be type %s' % value.__class__.__name__ - ) - - if not content and filename: - with open(to_bytes(filename, errors='surrogate_or_strict'), 'rb') as f: - part = email.mime.application.MIMEApplication(f.read(), _encoder=email.encoders.encode_noop) - del part['Content-Type'] - part.add_header('Content-Type', '%s/%s' % (main_type, sub_type)) - else: - part = email.mime.nonmultipart.MIMENonMultipart(main_type, sub_type) - part.set_payload(to_bytes(content)) - - part.add_header('Content-Disposition', 'form-data') - del part['MIME-Version'] - part.set_param( - 'name', - field, - header='Content-Disposition' - ) - if filename: - part.set_param( - 'filename', - to_native(os.path.basename(filename)), - header='Content-Disposition' - ) - - m.attach(part) - - if PY3: - # Ensure headers are not split over multiple lines - # The HTTP policy also uses CRLF by default - b_data = m.as_bytes(policy=email.policy.HTTP) - else: - # Py2 - # We cannot just call ``as_string`` since it provides no way - # to specify ``maxheaderlen`` - fp = cStringIO() # cStringIO seems to be required here - # Ensure headers are not split over multiple lines - g = email.generator.Generator(fp, maxheaderlen=0) - g.flatten(m) - # ``fix_eols`` switches from ``\n`` to ``\r\n`` - b_data = email.utils.fix_eols(fp.getvalue()) - del m - - headers, sep, b_content = b_data.partition(b'\r\n\r\n') - del b_data - - if PY3: - parser = email.parser.BytesHeaderParser().parsebytes - else: - # Py2 - parser = email.parser.HeaderParser().parsestr - - return ( - parser(headers)['content-type'], # Message converts to native strings - b_content - ) - - -class BitwardenException(Exception): - pass - - -class Bitwarden(object): - # https://bitwarden.com/help/vault-management-api - - def __init__(self, hostname='127.0.0.1', port=8087): - self._base_url = 'http://%s:%s' % (hostname, port) - - def _api_call(self, url_path, method='GET', body=None, body_format='json'): - url = '%s/%s' % (self._base_url, url_path) - - headers = {} - if body: - if body_format == 'json': - body = json.dumps(body) - headers['Content-Type'] = 'application/json' - elif body_format == 'form-multipart': - try: - content_type, body = prepare_multipart_no_base64(body) - except (TypeError, ValueError) as e: - BitwardenException('failed to parse body as form-multipart: %s' % to_native(e)) - headers['Content-Type'] = content_type - - # mostly taken from ansible.builtin.url lookup plugin - try: - response = open_url(url, method=method, data=body, headers=headers) - except HTTPError as e: - raise BitwardenException("Received HTTP error for %s : %s" % (url, to_native(e))) - except URLError as e: - raise BitwardenException("Failed lookup url for %s : %s" % (url, to_native(e))) - except SSLValidationError as e: - raise BitwardenException("Error validating the server's certificate for %s: %s" % (url, to_native(e))) - except ConnectionError as e: - raise BitwardenException("Error connecting to %s: %s" % (url, to_native(e))) - - try: - result = json.loads(to_text(response.read())) - except json.decoder.JSONDecodeError as e: - raise BitwardenException('Unable to load JSON: %s' % (to_native(e))) - - if not result.get('success'): - raise BitwardenException('API call failed: %s' % (result.get('data'))) - - return result - - - def is_unlocked(self): - """Check if the Bitwarden vault is unlocked. - """ - result = self._api_call('status') - return result['data']['template']['status'] == 'unlocked' - - - def sync(self): - """Pull the latest vault data from server. - """ - return self._api_call('sync', method='POST') - - - def get_items(self, name, username=None, folder_id=None, collection_id=None, organisation_id=None): - """Search for items in Bitwarden. Returns a list of the items that *exactly* matches all the parameters. - - A complete object: - { - "object": "item", - "id": "60020baa-e876-4fd4-b5bc-259b5e6389a8", - "organizationId": "44906ecb-b307-47a5-92b4-a097745592ed", - "folderId": null, - "type": 1, - "reprompt": 0, - "name": "myhost - purpose", - "notes": "Generated by Ansible.", - "favorite": false, - "login": { - "uris": [ - { - "match": null, - "uri": "https://www.example.com" - } - ], - "username": "username", - "password": "password", - "totp": null, - "passwordRevisionDate": null - }, - "collectionIds": [ - "153e991a-a56f-4e5d-9dea-c13b9e693fc4" - ], - "revisionDate": "2022-06-26T06:00:00.000Z" - } - """ - - params = urllib.parse.urlencode( - { - 'search': name, - }, - quote_via=urllib.parse.quote, - ) - result = self._api_call('list/object/items?%s' % (params)) - - # make sure that all the given parameters exactly match the requested one, as `bw` is not that precise (only performs a search) - # we are not using the filter parameters of the `bw` utility, as they perform an OR operation, but we want AND - matching_items = [] - for item in result['data']['data']: - if item['name'] == name \ - and (item['login']['username'] == username) \ - and (item['folderId'] == folder_id) \ - and ( - # cover case if collectionIds is an empty list - (collection_id is None and not item.get('collectionIds')) \ - or \ - (collection_id in item.get('collectionIds', [])) \ - ) \ - and (item['organizationId'] == organisation_id): - matching_items.append(item) - - return matching_items - - - def get_all_items(self): - """Get a list with all item from Bitwarden. - """ - - result = self._api_call('list/object/items') - return result['data']['data'] - - - def get_item_by_id(self, item_id): - """Get an item by ID from Bitwarden. Returns the item or None. Throws an exception if the id leads to unambiguous results. - """ - - result = self._api_call('object/item/%s' % (item_id)) - return result['data'] - - - def get_collections(self): - """Get a list with all collections from Bitwarden. - """ - - result = self._api_call('list/object/collections') - return result['data']['data'] - - - def get_folders(self): - """Get a list with all folders from Bitwarden. - """ - - result = self._api_call('list/object/folders') - return result['data']['data'] - - - def generate(self, password_length=40, - password_uppercase=True, password_lowercase=True, - password_numeric=True, password_special=False): - """Return a generated password. - """ - - params = 'length=%i%s%s%s%s' % ( - password_length, - '&uppercase' if password_uppercase else '', - '&lowercase' if password_lowercase else '', - '&number' if password_numeric else '', - '&special' if password_special else '', - ) - - result = self._api_call('generate?%s' % (params)) - return result['data']['data'] - - - def get_template_item_login_uri(self, uris): - """Get an item.login.uri object from the vault. - - A complete object: - - { - "match": null, - "uri": "https://google.com" - } - """ - login_uris = [] - if uris: - # To create uris, fetch the JSON structure for that. - result = self._api_call('object/template/item.login.uri') - template = result['data']['template'] - for uri in uris: - login_uri = template.copy() # make sure we are not editing the same object repeatedly - login_uri['uri'] = uri - login_uris.append(login_uri) - - return login_uris - - - def get_template_item_login(self, username=None, password=None, login_uris=None): - """Get an item.login object from the vault. - - A complete object: - - { - "uris": [], - "username": "jdoe", - "password": "myp@ssword123", - "totp": "JBSWY3DPEHPK3PXP" - } - """ - # To create a login item, fetch the JSON structure for that. - result = self._api_call('object/template/item.login') - login = result['data']['template'] - login['password'] = password - login['totp'] = '' - login['uris'] = login_uris or [] - login['username'] = username - - return login - - - def get_template_item(self, name, login=None, notes=None, organization_id=None, collection_ids=None, folder_id=None): - """Get an item.login object from the vault. - - A complete item object: - - { - "organizationId": null, - "collectionIds": null, - "folderId": null, - "type": 1, - "name": "Item name", - "notes": "Some notes about this item.", - "favorite": false, - "fields": [], - "login": null, - "secureNote": null, - "card": null, - "identity": null, - "reprompt": 0 - } - """ - # To create an item later on, fetch the item JSON structure, and fill in the appropriate - # values. - result = self._api_call('object/template/item') - item = result['data']['template'] - item['collectionIds'] = collection_ids - item['folderId'] = folder_id - item['login'] = login - item['name'] = name - item['notes'] = notes - item['organizationId'] = organization_id - - return item - - - def create_item(self, item): - """Creates an item object in Bitwarden. - """ - result = self._api_call('object/item', method='POST', body=item) - return result['data'] - - - def edit_item(self, item, item_id): - """Edits an item object in Bitwarden. - """ - result = self._api_call('object/item/%s' % (item_id), method='PUT', body=item) - return result['data'] - - - def add_attachment(self, item_id, attachment_path): - """Adds the file at `attachment_path` to the item specified by `item_id` - """ - - body = { - 'file': { - 'filename': attachment_path, - }, - } - result = self._api_call('attachment?itemId=%s' % (item_id), method='POST', body=body, body_format='form-multipart') - return result - - @staticmethod - def get_pretty_name(name, hostname=None, purpose=None): - """create a nice name for the item if none is given - schemes: - * hostname - purpose (for example "app4711 - MariaDB") - * hostname (for example "app4711") - """ - if not name: - name = hostname - if purpose: - name += ' - {}'.format(purpose) - - return name diff --git a/src/bw_menu/cli.py b/src/bw_menu/cli.py index f9711de..a69140f 100644 --- a/src/bw_menu/cli.py +++ b/src/bw_menu/cli.py @@ -1,8 +1,10 @@ import argparse +from importlib.metadata import version def parse_args(): parser = argparse.ArgumentParser(prog="bw-menu") + parser.add_argument("--version", action="version", version=f"%(prog)s {version('bw-menu')}") subparsers = parser.add_subparsers(dest="command", required=True) # select subcommand diff --git a/src/bw_menu/config.py b/src/bw_menu/config.py index fff1085..9fd6756 100644 --- a/src/bw_menu/config.py +++ b/src/bw_menu/config.py @@ -1,4 +1,5 @@ import os +import sys from dataclasses import dataclass, field from pathlib import Path @@ -6,6 +7,7 @@ import yaml DEFAULT_KEYBINDINGS = {"Return": "password", "Control+Return": "username"} +VALID_FIELDS = {"password", "username", "totp"} @dataclass @@ -26,10 +28,17 @@ def load() -> Config: try: data = yaml.safe_load(path.read_text()) if not isinstance(data, dict) or "keybindings" not in data: + print(f"Warning: config file {path} has invalid structure, using defaults", file=sys.stderr) return Config() kb = data["keybindings"] if not isinstance(kb, dict) or not kb: + print(f"Warning: config file {path} has invalid keybindings, using defaults", file=sys.stderr) return Config() - return Config(keybindings={str(k): str(v) for k, v in kb.items()}) + keybindings = {str(k): str(v) for k, v in kb.items()} + invalid = {v for v in keybindings.values() if v not in VALID_FIELDS} + if invalid: + raise ValueError(f"Invalid keybinding field(s): {', '.join(sorted(invalid))}. Valid fields: {', '.join(sorted(VALID_FIELDS))}") + return Config(keybindings=keybindings) except yaml.YAMLError: + print(f"Warning: config file {path} could not be parsed, using defaults", file=sys.stderr) return Config() diff --git a/src/bw_menu/history.py b/src/bw_menu/history.py index c3e0574..6a9c378 100644 --- a/src/bw_menu/history.py +++ b/src/bw_menu/history.py @@ -29,6 +29,7 @@ def __save(entries: list[Entry]): path.parent.mkdir(parents=True, exist_ok=True) data = {"entries": [{"name": e.name, "folder": e.folder, "username": e.username} for e in entries]} path.write_text(json.dumps(data)) + path.chmod(0o600) def add(entry: Entry): diff --git a/src/bw_menu/rbw.py b/src/bw_menu/rbw.py index e4b0a7a..a4f5b01 100644 --- a/src/bw_menu/rbw.py +++ b/src/bw_menu/rbw.py @@ -31,8 +31,8 @@ def get_field(entry: Entry, field: str) -> str: data = __load_raw(entry) if field == "password": - return data["data"]["password"] or "" - return "" + return data.get("data", {}).get("password") or "" + raise ValueError(f"Unknown field: {field}") def __get_totp(entry: Entry) -> str: diff --git a/src/bw_menu/selector/__init__.py b/src/bw_menu/selector/__init__.py index 58d280a..79e9fcb 100644 --- a/src/bw_menu/selector/__init__.py +++ b/src/bw_menu/selector/__init__.py @@ -1,5 +1,6 @@ from dataclasses import dataclass +from bw_menu import rbw, history from bw_menu.rbw import Entry # Record separator — safe delimiter unlikely to appear in vault entry names @@ -31,6 +32,19 @@ def decode_info(info: str) -> Entry: return Entry(parts[0], parts[1], parts[2]) +def prepare_entries() -> tuple[list[Entry], list[Entry]]: + all_entries = rbw.list_entries() + history_entries = history.load() + + all_set = set(all_entries) + history_in_list = [e for e in history_entries if e in all_set] + history_set = set(history_in_list) + remaining = [e for e in all_entries if e not in history_set] + remaining.sort(key=lambda e: (e.folder.lower(), e.name.lower())) + + return history_in_list, remaining + + def create_selector(name: str): from bw_menu.selector.rofi import RofiSelector from bw_menu.selector.fzf import FzfSelector diff --git a/src/bw_menu/selector/fzf.py b/src/bw_menu/selector/fzf.py index b75bfdf..39d99c5 100644 --- a/src/bw_menu/selector/fzf.py +++ b/src/bw_menu/selector/fzf.py @@ -1,19 +1,11 @@ import subprocess -from bw_menu import rbw, history -from bw_menu.selector import Selection, format_entry, encode_info, decode_info +from bw_menu.selector import Selection, format_entry, encode_info, decode_info, prepare_entries class FzfSelector: def run(self, args) -> Selection | None: - all_entries = rbw.list_entries() - history_entries = history.load() - - all_set = set(all_entries) - history_in_list = [e for e in history_entries if e in all_set] - history_set = set(history_in_list) - remaining = [e for e in all_entries if e not in history_set] - remaining.sort(key=lambda e: (e.folder.lower(), e.name.lower())) + history_in_list, remaining = prepare_entries() lines = [] for entry in history_in_list: diff --git a/src/bw_menu/selector/rofi.py b/src/bw_menu/selector/rofi.py index 594ae6a..cd75762 100644 --- a/src/bw_menu/selector/rofi.py +++ b/src/bw_menu/selector/rofi.py @@ -3,8 +3,8 @@ import subprocess import sys from pathlib import Path -from bw_menu import rbw, history, config -from bw_menu.selector import Selection, format_entry, encode_info, decode_info +from bw_menu import config +from bw_menu.selector import Selection, format_entry, encode_info, decode_info, prepare_entries class RofiSelector: @@ -27,6 +27,8 @@ class RofiSelector: if rofi_retv == 1: rofi_info = os.environ.get("ROFI_INFO", "") + if not rofi_info: + return None return Selection(decode_info(rofi_info), fields[0]) # kb-custom-N: ROFI_RETV 10 + (N-1) @@ -34,6 +36,8 @@ class RofiSelector: field_index = custom_index + 1 if 0 <= field_index < len(fields): rofi_info = os.environ.get("ROFI_INFO", "") + if not rofi_info: + return None return Selection(decode_info(rofi_info), fields[field_index]) return None @@ -55,14 +59,7 @@ class RofiSelector: subprocess.run(cmd) def __print_entries(self, keybindings: dict[str, str]): - all_entries = rbw.list_entries() - history_entries = history.load() - - all_set = set(all_entries) - history_in_list = [e for e in history_entries if e in all_set] - history_set = set(history_in_list) - remaining = [e for e in all_entries if e not in history_set] - remaining.sort(key=lambda e: (e.folder.lower(), e.name.lower())) + history_in_list, remaining = prepare_entries() print(f"\0prompt\x1fSelect item") print(f"\0no-custom\x1ftrue") diff --git a/src/main.py b/src/main.py deleted file mode 100755 index 5dc1e44..0000000 --- a/src/main.py +++ /dev/null @@ -1,394 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8; py-indent-offset: 4 -*- -# -# Author: Linuxfabrik GmbH, Zurich, Switzerland -# Contact: info (at) linuxfabrik (dot) ch -# https://www.linuxfabrik.ch/ -# License: The Unlicense, see LICENSE file. - -"""See the README for more details. -""" - -import argparse # pylint: disable=C0413 -import json -import logging -import os -import sys - -import pyperclip - -import bitwarden # pylint: disable=C0413 -import lib.base # pylint: disable=C0413 -import lib.shell # pylint: disable=C0413 - -__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland' -__version__ = '2024042101' - -DESCRIPTION = """todo""" - - -def parse_args(): - """Parse command line arguments using argparse. - """ - parser = argparse.ArgumentParser(description=DESCRIPTION) - - parser.add_argument( - '-V', '--version', - action='version', - version='%(prog)s: v{} by {}'.format(__version__, __author__) - ) - - # Define subcommands - subparsers = parser.add_subparsers(dest='COMMAND', required=True) - - # Select command - select_parser = subparsers.add_parser('select', help='Select command') - # we do not actually need the title, the variable is never used - # however, rofi always calls the given executable with the title of the item, - # so we need to accept it to prevent an argparse "unrecognized arguments" error - select_parser.add_argument( - help='Internal use only', - dest='TITLE', - nargs='?', - ) - - # History command with nested subcommands - history_parser = subparsers.add_parser('history', help='History related commands') - history_subparsers = history_parser.add_subparsers( - help='History subcommands', - dest='HISTORY_COMMAND', - required=True, - ) - - get_parser = history_subparsers.add_parser('get', help='Get a specific item from history') - get_parser.add_argument( - dest='HISTORY_GET_INDEX', - metavar='index', - type=int, - help='Index of the item in history', - ) - - get_parser.add_argument( - dest='HISTORY_GET_FIELD', - metavar='field', - type=str, - help='Field to retrieve', - ) - - get_parser.add_argument( - '--print', - help='Print the field instead of saving it to the clipboard.', - dest='HISTORY_GET_PRINT', - action='store_true', - default=False, - ) - - - return parser.parse_args() - - -def get_history(): - file_path = '/tmp/bw-menu-history.json' # TODO: use XDG paths - - try: - with open(file_path, 'r', encoding='UTF-8') as file: - content = file.read() - except FileNotFoundError: - # Return an empty list if the file cannot be opened - return [] - - try: - history = json.loads(content) - return history['history'] - except json.JSONDecodeError: - # Return an empty list if deserialization fails - return [] - - -def save_history(entries): - history = {'history': entries} - json_str = json.dumps(history) - - file_path = '/tmp/bw-menu-history.json' # TODO: use XDG paths - - # Write the JSON string to a file, overwriting it if it already exists - with open(file_path, 'w', encoding='UTF-8') as file: - file.write(json_str) - - -MAX_HISTORY_LEN = 5 # TODO: make configurable -def add_to_history(entry): - history = get_history() - history.insert(0, entry) - if len(history) > MAX_HISTORY_LEN: - history = history[:MAX_HISTORY_LEN] - save_history(history) - - -def save_to_clipboard(string): - pyperclip.copy(string) - - -def get_title(item, folders, collections): - title = '' - - folder_id = item.get('folderId') - if folder_id: - title += f'{folders[folder_id]}/' - - # TODO: how do we handle multiple collections nicely? i dont like the pipe for searching. my - # fav solution right now is to display the entry multiple times, eg "000/000-p-db01", "001/000-p-db01" - # get_title should return a list of titles which match the current item - collection_ids = item.get('collectionIds', []) - for collection_id in collection_ids: - title += collections[collection_id] - if collection_ids: - title += '/' - - if not title: - title += 'ungrouped/' - - title += item['name'] - - username = item['login'].get('username') - if username: - title += f' ({username})' - - return title - - -def generate_rofi_list(logins): - print('\0prompt\x1fSelect item') - print('\0message\x1fReturn | Copy password\rShift+Return | Copy username or selected field\rControl+Return | Show item') - print('\0use-hot-keys\x1ftrue') - print('\0no-custom\x1ftrue') - print('\0markup-rows\x1ftrue') - - for title in sorted(logins.keys()): - print(title, end='') - icon = 'folder' # TODO: what icons are available? ~/.icons seems to work - print(f'\0icon\x1f{icon}\x1finfo\x1f{logins[title]["id"]}') - - -def get_bw_collection_id2name_map(): - logger = logging.getLogger() - - bw = bitwarden.Bitwarden() - try: - collections = bw.get_collections() - # convert to lookup dictionary - collections = {item['id']: item['name'] for item in collections} - logger.debug('Got list of Bitwarden collections.') - except Exception: - logger.exception('Failed to get list of Bitwarden collections.') - sys.exit(1) - return collections - - -def get_bw_folders_id2name_map(): - logger = logging.getLogger() - - bw = bitwarden.Bitwarden() - try: - folders = bw.get_folders() - # convert to lookup dictionary - folders = {item['id']: item['name'] for item in folders} - logger.debug('Got list of Bitwarden folders.') - except Exception: - logger.exception('Failed to get list of Bitwarden folders.') - sys.exit(1) - return folders - - -def get_bitwarden_logins(): - logger = logging.getLogger() - - bw = bitwarden.Bitwarden() - - if not bw.is_unlocked: - logger.error('Not logged into Bitwarden, or Bitwarden Vault is locked. Please run `bw login` and `bw unlock` first.') - sys.exit(1) - - try: - # TODO: add a timeout for running the sync? - bw.sync() - logger.debug('Synchronised the Bitwarden Vault.') - except Exception: - logger.exception('Failed to sync the Bitwarden Vault.') - sys.exit(1) - - try: - items = bw.get_all_items() - # we only want items which have a login - items = [item for item in items if 'login' in item] - logger.debug('Got list of Bitwarden items.') - except Exception: - logger.exception('Failed to get list of Bitwarden items.') - sys.exit(1) - - collections = get_bw_collection_id2name_map() - folders = get_bw_folders_id2name_map() - - logins = {} - - for item in items: - title = get_title(item, folders, collections) - logins[title] = item - - return logins - - -# the flow is: -# 1. bw-menu starts rofi in the script mode -# 2. rofi calls bw-menu calls bw-menu to get a list of items to display -# 3. the user interacts with the list -# 4. rofi sets environment variables, and calls bw-menu with the title of the selected item -# 5. bw-menu acts based on the environment variables -# -# currently using the script mode so that we can pass additional info (eg the id). -# no other mode supports this, meaning we would need to match based on the title -def handle_select(): - logger = logging.getLogger() - - rofi_retv = os.environ.get('ROFI_RETV', None) - rofi_info = os.environ.get('ROFI_INFO', None) - - logger.debug(f'ROFI_RETV: {rofi_retv}') - logger.debug(f'ROFI_INFO: {rofi_info}') - - if rofi_retv is None: - # this means rofi is not running yet, so let's start it - logger.debug('Starting initial rofi process') - # we need to remap accept-alt and accept-custom so that we can use the keys for custom binds - cmd = f''' - rofi - -show bw - -modes 'bw:{sys.argv[0]} select' - -matching fuzzy - -sorting-method fzf - -sort - -kb-accept-alt F12 - -kb-accept-custom F11 - -kb-custom-1 Shift+Return - -kb-custom-2 Control+Return - ''' - cmd = cmd.replace('\n', ' ') - logger.debug(f'Running {cmd}') - _, stderr, retc = lib.base.coe(lib.shell.shell_exec(cmd)) - if retc != 0 or stderr: # TODO: better error handling - logger.error(f'retc: {retc}, stderr: {stderr}') - sys.exit(1) - - - else: - # this means we are being called by rofi as a custom script - # either generate the list, or act on input - rofi_retv = int(rofi_retv) - - # 0: Initial call of script - if rofi_retv == 0: - generate_rofi_list(get_bitwarden_logins()) - - else: - try: - bw = bitwarden.Bitwarden() - item = bw.get_item_by_id(rofi_info) - except Exception: - logger.exception(f'Failed to get Bitwarden item with id {rofi_info}') - sys.exit(1) - - add_to_history(rofi_info) - - # 1: Selected an entry - if rofi_retv == 1: - logger.info('Copying password') - password = item['login']['password'] - save_to_clipboard(password) - - # 10-28: Custom keybinding 1-19 - elif rofi_retv == 10: - logger.info('Copying username') - username = item['login']['username'] - save_to_clipboard(username) - - elif rofi_retv == 11: - logger.info('Showing item') - print('\0prompt\x1fSelect field') - print('\0message\x1fReturn | Copy field') - print('{}\0icon\x1f{}\x1finfo\x1f{}', 'TODO', 'folder', 'single-item:todo') - # TODO: not sure how to proceed here. currently pressing return leads to the "copy password" action - # one option is to pass the action in the info and parsing that - - else: - print(f'Unknown rofi_retv: {rofi_retv}') - sys.exit(2) - - sys.exit(0) - - -def handle_history_get(index, field, print_result=False): - logger = logging.getLogger() - - history = get_history() - if index >= len(history): - print(f'Trying to get history element {index}, but the history only contains {len(history)} elements') - sys.exit(1) - - item_id = history[index] - - try: - bw = bitwarden.Bitwarden() - item = bw.get_item_by_id(item_id) - except Exception: - logger.exception(f'Failed to get Bitwarden item with id "{item_id}". It might not exist anymore.') - sys.exit(1) - - login = item.get('login') - if login is None: - logger.error(f'The Bitwarden item with id "{item_id}" does not have a login attribute.') - sys.exit(2) - - result = None - if field == 'title': - collections = get_bw_collection_id2name_map() - folders = get_bw_folders_id2name_map() - title = get_title(item, folders, collections) - result = title - - elif field in login: - result = login[field] - else: - print(f"Could not find a field named '{field}'") - sys.exit(1) - - if print_result: - print(result) - else: - save_to_clipboard(result) - - -def main(): - """The main function. Hier spielt die Musik. - """ - - logging_format = "[%(filename)s->%(funcName)s:%(lineno)s] %(message)s" - logging.basicConfig( - format=logging_format, - # level=logging.DEBUG, - ) - logger = logging.getLogger() - - logger.debug(sys.argv) - - args = parse_args() - - - if args.COMMAND == 'select': - handle_select() - elif args.COMMAND == 'history': - if args.HISTORY_COMMAND == 'get': - handle_history_get(args.HISTORY_GET_INDEX, args.HISTORY_GET_FIELD, args.HISTORY_GET_PRINT) - - -if __name__ == '__main__': - main()