converted from rust to python
This commit is contained in:
parent
7a949bde0e
commit
153a98620c
@ -3,7 +3,10 @@
|
|||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
```bash
|
```bash
|
||||||
# todo
|
git clone --branch python https://git.navidsassan.ch/navid.sassan/bw-menu.git
|
||||||
|
dnf install -y python3 python3-pyperclip
|
||||||
|
|
||||||
|
./src/main.py --help
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
|
394
src/main.py
Executable file
394
src/main.py
Executable file
@ -0,0 +1,394 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8; py-indent-offset: 4 -*-
|
||||||
|
#
|
||||||
|
# Author: Linuxfabrik GmbH, Zurich, Switzerland
|
||||||
|
# Contact: info (at) linuxfabrik (dot) ch
|
||||||
|
# https://www.linuxfabrik.ch/
|
||||||
|
# License: The Unlicense, see LICENSE file.
|
||||||
|
|
||||||
|
"""See the README for more details.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse # pylint: disable=C0413
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import pyperclip
|
||||||
|
|
||||||
|
import lib.base # pylint: disable=C0413
|
||||||
|
import lib.bitwarden # pylint: disable=C0413
|
||||||
|
import lib.shell # pylint: disable=C0413
|
||||||
|
|
||||||
|
__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
|
||||||
|
__version__ = '2024042101'
|
||||||
|
|
||||||
|
DESCRIPTION = """todo"""
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args():
|
||||||
|
"""Parse command line arguments using argparse.
|
||||||
|
"""
|
||||||
|
parser = argparse.ArgumentParser(description=DESCRIPTION)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'-V', '--version',
|
||||||
|
action='version',
|
||||||
|
version='%(prog)s: v{} by {}'.format(__version__, __author__)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Define subcommands
|
||||||
|
subparsers = parser.add_subparsers(dest='COMMAND', required=True)
|
||||||
|
|
||||||
|
# Select command
|
||||||
|
select_parser = subparsers.add_parser('select', help='Select command')
|
||||||
|
# we do not actually need the title, the variable is never used
|
||||||
|
# however, rofi always calls the given executable with the title of the item,
|
||||||
|
# so we need to accept it to prevent an argparse "unrecognized arguments" error
|
||||||
|
select_parser.add_argument(
|
||||||
|
help='Internal use only',
|
||||||
|
dest='TITLE',
|
||||||
|
nargs='?',
|
||||||
|
)
|
||||||
|
|
||||||
|
# History command with nested subcommands
|
||||||
|
history_parser = subparsers.add_parser('history', help='History related commands')
|
||||||
|
history_subparsers = history_parser.add_subparsers(
|
||||||
|
help='History subcommands',
|
||||||
|
dest='HISTORY_COMMAND',
|
||||||
|
required=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
get_parser = history_subparsers.add_parser('get', help='Get a specific item from history')
|
||||||
|
get_parser.add_argument(
|
||||||
|
dest='HISTORY_GET_INDEX',
|
||||||
|
metavar='index',
|
||||||
|
type=int,
|
||||||
|
help='Index of the item in history',
|
||||||
|
)
|
||||||
|
|
||||||
|
get_parser.add_argument(
|
||||||
|
dest='HISTORY_GET_FIELD',
|
||||||
|
metavar='field',
|
||||||
|
type=str,
|
||||||
|
help='Field to retrieve',
|
||||||
|
)
|
||||||
|
|
||||||
|
get_parser.add_argument(
|
||||||
|
'--print',
|
||||||
|
help='Print the field instead of saving it to the clipboard.',
|
||||||
|
dest='HISTORY_GET_PRINT',
|
||||||
|
action='store_true',
|
||||||
|
default=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
def get_history():
|
||||||
|
file_path = '/tmp/bw-menu-history.json' # TODO: use XDG paths
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(file_path, 'r', encoding='UTF-8') as file:
|
||||||
|
content = file.read()
|
||||||
|
except FileNotFoundError:
|
||||||
|
# Return an empty list if the file cannot be opened
|
||||||
|
return []
|
||||||
|
|
||||||
|
try:
|
||||||
|
history = json.loads(content)
|
||||||
|
return history['history']
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
# Return an empty list if deserialization fails
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def save_history(entries):
|
||||||
|
history = {'history': entries}
|
||||||
|
json_str = json.dumps(history)
|
||||||
|
|
||||||
|
file_path = '/tmp/bw-menu-history.json' # TODO: use XDG paths
|
||||||
|
|
||||||
|
# Write the JSON string to a file, overwriting it if it already exists
|
||||||
|
with open(file_path, 'w', encoding='UTF-8') as file:
|
||||||
|
file.write(json_str)
|
||||||
|
|
||||||
|
|
||||||
|
MAX_HISTORY_LEN = 5 # TODO: make configurable
|
||||||
|
def add_to_history(entry):
|
||||||
|
history = get_history()
|
||||||
|
history.insert(0, entry)
|
||||||
|
if len(history) > MAX_HISTORY_LEN:
|
||||||
|
history = history[:MAX_HISTORY_LEN]
|
||||||
|
save_history(history)
|
||||||
|
|
||||||
|
|
||||||
|
def save_to_clipboard(string):
|
||||||
|
pyperclip.copy(string)
|
||||||
|
|
||||||
|
|
||||||
|
def get_title(item, folders, collections):
|
||||||
|
title = ''
|
||||||
|
|
||||||
|
folder_id = item.get('folderId')
|
||||||
|
if folder_id:
|
||||||
|
title += f'{folders[folder_id]}/'
|
||||||
|
|
||||||
|
# TODO: how do we handle multiple collections nicely? i dont like the pipe for searching. my
|
||||||
|
# fav solution right now is to display the entry multiple times, eg "000/000-p-db01", "001/000-p-db01"
|
||||||
|
# get_title should return a list of titles which match the current item
|
||||||
|
collection_ids = item.get('collectionIds', [])
|
||||||
|
for collection_id in collection_ids:
|
||||||
|
title += collections[collection_id]
|
||||||
|
if collection_ids:
|
||||||
|
title += '/'
|
||||||
|
|
||||||
|
if not title:
|
||||||
|
title += 'ungrouped/'
|
||||||
|
|
||||||
|
title += item['name']
|
||||||
|
|
||||||
|
username = item['login'].get('username')
|
||||||
|
if username:
|
||||||
|
title += f' ({username})'
|
||||||
|
|
||||||
|
return title
|
||||||
|
|
||||||
|
|
||||||
|
def generate_rofi_list(logins):
|
||||||
|
print('\0prompt\x1fSelect item')
|
||||||
|
print('\0message\x1fReturn | Copy password\rShift+Return | Copy username or selected field\rControl+Return | Show item')
|
||||||
|
print('\0use-hot-keys\x1ftrue')
|
||||||
|
print('\0no-custom\x1ftrue')
|
||||||
|
print('\0markup-rows\x1ftrue')
|
||||||
|
|
||||||
|
for title in sorted(logins.keys()):
|
||||||
|
print(title, end='')
|
||||||
|
icon = 'folder' # TODO: what icons are available? ~/.icons seems to work
|
||||||
|
print(f'\0icon\x1f{icon}\x1finfo\x1f{logins[title]["id"]}')
|
||||||
|
|
||||||
|
|
||||||
|
def get_bw_collection_id2name_map():
|
||||||
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
bw = lib.bitwarden.Bitwarden()
|
||||||
|
try:
|
||||||
|
collections = bw.get_collections()
|
||||||
|
# convert to lookup dictionary
|
||||||
|
collections = {item['id']: item['name'] for item in collections}
|
||||||
|
logger.debug('Got list of Bitwarden collections.')
|
||||||
|
except Exception:
|
||||||
|
logger.exception('Failed to get list of Bitwarden collections.')
|
||||||
|
sys.exit(1)
|
||||||
|
return collections
|
||||||
|
|
||||||
|
|
||||||
|
def get_bw_folders_id2name_map():
|
||||||
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
bw = lib.bitwarden.Bitwarden()
|
||||||
|
try:
|
||||||
|
folders = bw.get_folders()
|
||||||
|
# convert to lookup dictionary
|
||||||
|
folders = {item['id']: item['name'] for item in folders}
|
||||||
|
logger.debug('Got list of Bitwarden folders.')
|
||||||
|
except Exception:
|
||||||
|
logger.exception('Failed to get list of Bitwarden folders.')
|
||||||
|
sys.exit(1)
|
||||||
|
return folders
|
||||||
|
|
||||||
|
|
||||||
|
def get_bitwarden_logins():
|
||||||
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
bw = lib.bitwarden.Bitwarden()
|
||||||
|
|
||||||
|
if not bw.is_unlocked:
|
||||||
|
logger.error('Not logged into Bitwarden, or Bitwarden Vault is locked. Please run `bw login` and `bw unlock` first.')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# TODO: add a timeout for running the sync?
|
||||||
|
bw.sync()
|
||||||
|
logger.debug('Synchronised the Bitwarden Vault.')
|
||||||
|
except Exception:
|
||||||
|
logger.exception('Failed to sync the Bitwarden Vault.')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
items = bw.get_all_items()
|
||||||
|
# we only want items which have a login
|
||||||
|
items = [item for item in items if 'login' in item]
|
||||||
|
logger.debug('Got list of Bitwarden items.')
|
||||||
|
except Exception:
|
||||||
|
logger.exception('Failed to get list of Bitwarden items.')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
collections = get_bw_collection_id2name_map()
|
||||||
|
folders = get_bw_folders_id2name_map()
|
||||||
|
|
||||||
|
logins = {}
|
||||||
|
|
||||||
|
for item in items:
|
||||||
|
title = get_title(item, folders, collections)
|
||||||
|
logins[title] = item
|
||||||
|
|
||||||
|
return logins
|
||||||
|
|
||||||
|
|
||||||
|
# the flow is:
|
||||||
|
# 1. bw-menu starts rofi in the script mode
|
||||||
|
# 2. rofi calls bw-menu calls bw-menu to get a list of items to display
|
||||||
|
# 3. the user interacts with the list
|
||||||
|
# 4. rofi sets environment variables, and calls bw-menu with the title of the selected item
|
||||||
|
# 5. bw-menu acts based on the environment variables
|
||||||
|
#
|
||||||
|
# currently using the script mode so that we can pass additional info (eg the id).
|
||||||
|
# no other mode supports this, meaning we would need to match based on the title
|
||||||
|
def handle_select():
|
||||||
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
rofi_retv = os.environ.get('ROFI_RETV', None)
|
||||||
|
rofi_info = os.environ.get('ROFI_INFO', None)
|
||||||
|
|
||||||
|
logger.debug(f'ROFI_RETV: {rofi_retv}')
|
||||||
|
logger.debug(f'ROFI_INFO: {rofi_info}')
|
||||||
|
|
||||||
|
if rofi_retv is None:
|
||||||
|
# this means rofi is not running yet, so let's start it
|
||||||
|
logger.debug('Starting initial rofi process')
|
||||||
|
# we need to remap accept-alt and accept-custom so that we can use the keys for custom binds
|
||||||
|
cmd = f'''
|
||||||
|
rofi
|
||||||
|
-show bw
|
||||||
|
-modes 'bw:{sys.argv[0]} select'
|
||||||
|
-matching fuzzy
|
||||||
|
-sorting-method fzf
|
||||||
|
-sort
|
||||||
|
-kb-accept-alt F12
|
||||||
|
-kb-accept-custom F11
|
||||||
|
-kb-custom-1 Shift+Return
|
||||||
|
-kb-custom-2 Control+Return
|
||||||
|
'''
|
||||||
|
cmd = cmd.replace('\n', ' ')
|
||||||
|
logger.debug(f'Running {cmd}')
|
||||||
|
_, stderr, retc = lib.base.coe(lib.shell.shell_exec(cmd))
|
||||||
|
if retc != 0 or stderr: # TODO: better error handling
|
||||||
|
logger.error(f'retc: {retc}, stderr: {stderr}')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
else:
|
||||||
|
# this means we are being called by rofi as a custom script
|
||||||
|
# either generate the list, or act on input
|
||||||
|
rofi_retv = int(rofi_retv)
|
||||||
|
|
||||||
|
# 0: Initial call of script
|
||||||
|
if rofi_retv == 0:
|
||||||
|
generate_rofi_list(get_bitwarden_logins())
|
||||||
|
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
bw = lib.bitwarden.Bitwarden()
|
||||||
|
item = bw.get_item_by_id(rofi_info)
|
||||||
|
except Exception:
|
||||||
|
logger.exception(f'Failed to get Bitwarden item with id {rofi_info}')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
add_to_history(rofi_info)
|
||||||
|
|
||||||
|
# 1: Selected an entry
|
||||||
|
if rofi_retv == 1:
|
||||||
|
logger.info('Copying password')
|
||||||
|
password = item['login']['password']
|
||||||
|
save_to_clipboard(password)
|
||||||
|
|
||||||
|
# 10-28: Custom keybinding 1-19
|
||||||
|
elif rofi_retv == 10:
|
||||||
|
logger.info('Copying username')
|
||||||
|
username = item['login']['username']
|
||||||
|
save_to_clipboard(username)
|
||||||
|
|
||||||
|
elif rofi_retv == 11:
|
||||||
|
logger.info('Showing item')
|
||||||
|
print('\0prompt\x1fSelect field')
|
||||||
|
print('\0message\x1fReturn | Copy field')
|
||||||
|
print('{}\0icon\x1f{}\x1finfo\x1f{}', 'TODO', 'folder', 'single-item:todo')
|
||||||
|
# TODO: not sure how to proceed here. currently pressing return leads to the "copy password" action
|
||||||
|
# one option is to pass the action in the info and parsing that
|
||||||
|
|
||||||
|
else:
|
||||||
|
print(f'Unknown rofi_retv: {rofi_retv}')
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
|
||||||
|
def handle_history_get(index, field, print_result=False):
|
||||||
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
history = get_history()
|
||||||
|
if index >= len(history):
|
||||||
|
print(f'Trying to get history element {index}, but the history only contains {len(history)} elements')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
item_id = history[index]
|
||||||
|
|
||||||
|
try:
|
||||||
|
bw = lib.bitwarden.Bitwarden()
|
||||||
|
item = bw.get_item_by_id(item_id)
|
||||||
|
except Exception:
|
||||||
|
logger.exception(f'Failed to get Bitwarden item with id "{item_id}". It might not exist anymore.')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
login = item.get('login')
|
||||||
|
if login is None:
|
||||||
|
logger.error(f'The Bitwarden item with id "{item_id}" does not have a login attribute.')
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
result = None
|
||||||
|
if field == 'title':
|
||||||
|
collections = get_bw_collection_id2name_map()
|
||||||
|
folders = get_bw_folders_id2name_map()
|
||||||
|
title = get_title(item, folders, collections)
|
||||||
|
result = title
|
||||||
|
|
||||||
|
elif field in login:
|
||||||
|
result = login[field]
|
||||||
|
else:
|
||||||
|
print(f"Could not find a field named '{field}'")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if print_result:
|
||||||
|
print(result)
|
||||||
|
else:
|
||||||
|
save_to_clipboard(result)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""The main function. Hier spielt die Musik.
|
||||||
|
"""
|
||||||
|
|
||||||
|
logging_format = "[%(filename)s->%(funcName)s:%(lineno)s] %(message)s"
|
||||||
|
logging.basicConfig(
|
||||||
|
format=logging_format,
|
||||||
|
# level=logging.DEBUG,
|
||||||
|
)
|
||||||
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
logger.debug(sys.argv)
|
||||||
|
|
||||||
|
args = parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
if args.COMMAND == 'select':
|
||||||
|
handle_select()
|
||||||
|
elif args.COMMAND == 'history':
|
||||||
|
if args.HISTORY_COMMAND == 'get':
|
||||||
|
handle_history_get(args.HISTORY_GET_INDEX, args.HISTORY_GET_FIELD, args.HISTORY_GET_PRINT)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
Loading…
x
Reference in New Issue
Block a user