Compare commits

..

No commits in common. "python" and "main" have entirely different histories.
python ... main

10 changed files with 827 additions and 848 deletions

16
.gitignore vendored Normal file
View File

@ -0,0 +1,16 @@
# ---> Rust
# Generated by Cargo
# will have compiled files and executables
debug/
target/
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb

17
Cargo.toml Normal file
View File

@ -0,0 +1,17 @@
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[package]
name = "bw-menu"
version = "0.1.0-next"
authors = ["Navid Sassan <navid@sassan.ch>"]
edition = "2021"
license = "The Unlicense"
[dependencies]
clap = { version = "4.4", features = ["derive"] }
env_logger = "0.10"
itertools = "0.11"
log = "0.4"
reqwest = { version = "0.11", features = ["blocking", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

24
LICENSE Normal file
View File

@ -0,0 +1,24 @@
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.
In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
For more information, please refer to <http://unlicense.org/>

View File

@ -1,17 +1,26 @@
# Bitwarden Menu
# bw-menu
Bitwarden Menu
## Installation
```bash
# install an up-to-date rust and cargo, see https://rustup.rs/
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# install dependencies
sudo dnf install -y python3 python3-pyperclip xclip rofi
sudo dnf install rofi
git clone --branch python https://git.navidsassan.ch/navid.sassan/bw-menu.git
git clone https://github.com/Linuxfabrik/lib.git
mkdir -p ~/.cargo
cat > ~/.cargo/config.toml << 'EOF'
[registries.gitea]
index = "https://git.navidsassan.ch/navid.sassan/_cargo-index.git"
cd bw-menu
ln -s ../../lib src/lib
./src/main.py --help
[net]
git-fetch-with-cli = true
EOF
cargo install bw-menu --registry gitea
~/.cargo/bin/bw-menu --version
```
@ -20,11 +29,18 @@ ln -s ../../lib src/lib
# start bw serve
export BW_SESSION="$(bw unlock --raw)" && bw serve --hostname 127.0.0.1&
# get dropdown (takes a second)
./src/main.py select
bw-menu select
# get password of last selected entry
./src/main.py history get 0 password
bw-menu history get 0 password
# list 5 last selected entries
./src/main.py history list
# list 5 last selected entries (currently saved in `/tmp`)
bw-menu history list
```
## Known Issues
* The collections of entries that are in multiple collections are not displayed correctly
* The history is reset after every reboot if `/tmp` is not persistent
A re-write in Python is planned, during which these issues will be fixed.

View File

@ -1,443 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: (c) 2022, Linuxfabrik, Zurich, Switzerland, https://www.linuxfabrik.ch
# The Unlicense (see LICENSE or https://unlicense.org/)
from __future__ import absolute_import, division, print_function
import email.encoders
import email.mime.application
import email.mime.multipart
import email.mime.nonmultipart
import email.parser
import email.utils
import json
import mimetypes
import os
import urllib.parse
from urllib.error import HTTPError, URLError
from ansible.module_utils.common.collections import Mapping
from ansible.module_utils.six import PY2, PY3, string_types
from ansible.module_utils.six.moves import cStringIO
try:
import email.policy
except ImportError:
# Py2
import email.generator
from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.module_utils.urls import (ConnectionError, SSLValidationError,
open_url)
def prepare_multipart_no_base64(fields):
"""Taken from ansible.module_utils.urls, but adjusted to not no encoding as the bitwarden API does not work with that
(even though it should according to the RFC, Content-Transfer-Encoding is deprecated but not removed).
See https://github.com/ansible/ansible/issues/73621
Takes a mapping, and prepares a multipart/form-data body
:arg fields: Mapping
:returns: tuple of (content_type, body) where ``content_type`` is
the ``multipart/form-data`` ``Content-Type`` header including
``boundary`` and ``body`` is the prepared bytestring body
Payload content from a file will be base64 encoded and will include
the appropriate ``Content-Transfer-Encoding`` and ``Content-Type``
headers.
Example:
{
"file1": {
"filename": "/bin/true",
"mime_type": "application/octet-stream"
},
"file2": {
"content": "text based file content",
"filename": "fake.txt",
"mime_type": "text/plain",
},
"text_form_field": "value"
}
"""
if not isinstance(fields, Mapping):
raise TypeError(
'Mapping is required, cannot be type %s' % fields.__class__.__name__
)
m = email.mime.multipart.MIMEMultipart('form-data')
for field, value in sorted(fields.items()):
if isinstance(value, string_types):
main_type = 'text'
sub_type = 'plain'
content = value
filename = None
elif isinstance(value, Mapping):
filename = value.get('filename')
content = value.get('content')
if not any((filename, content)):
raise ValueError('at least one of filename or content must be provided')
mime = value.get('mime_type')
if not mime:
try:
mime = mimetypes.guess_type(filename or '', strict=False)[0] or 'application/octet-stream'
except Exception:
mime = 'application/octet-stream'
main_type, sep, sub_type = mime.partition('/')
else:
raise TypeError(
'value must be a string, or mapping, cannot be type %s' % value.__class__.__name__
)
if not content and filename:
with open(to_bytes(filename, errors='surrogate_or_strict'), 'rb') as f:
part = email.mime.application.MIMEApplication(f.read(), _encoder=email.encoders.encode_noop)
del part['Content-Type']
part.add_header('Content-Type', '%s/%s' % (main_type, sub_type))
else:
part = email.mime.nonmultipart.MIMENonMultipart(main_type, sub_type)
part.set_payload(to_bytes(content))
part.add_header('Content-Disposition', 'form-data')
del part['MIME-Version']
part.set_param(
'name',
field,
header='Content-Disposition'
)
if filename:
part.set_param(
'filename',
to_native(os.path.basename(filename)),
header='Content-Disposition'
)
m.attach(part)
if PY3:
# Ensure headers are not split over multiple lines
# The HTTP policy also uses CRLF by default
b_data = m.as_bytes(policy=email.policy.HTTP)
else:
# Py2
# We cannot just call ``as_string`` since it provides no way
# to specify ``maxheaderlen``
fp = cStringIO() # cStringIO seems to be required here
# Ensure headers are not split over multiple lines
g = email.generator.Generator(fp, maxheaderlen=0)
g.flatten(m)
# ``fix_eols`` switches from ``\n`` to ``\r\n``
b_data = email.utils.fix_eols(fp.getvalue())
del m
headers, sep, b_content = b_data.partition(b'\r\n\r\n')
del b_data
if PY3:
parser = email.parser.BytesHeaderParser().parsebytes
else:
# Py2
parser = email.parser.HeaderParser().parsestr
return (
parser(headers)['content-type'], # Message converts to native strings
b_content
)
class BitwardenException(Exception):
pass
class Bitwarden(object):
# https://bitwarden.com/help/vault-management-api
def __init__(self, hostname='127.0.0.1', port=8087):
self._base_url = 'http://%s:%s' % (hostname, port)
def _api_call(self, url_path, method='GET', body=None, body_format='json'):
url = '%s/%s' % (self._base_url, url_path)
headers = {}
if body:
if body_format == 'json':
body = json.dumps(body)
headers['Content-Type'] = 'application/json'
elif body_format == 'form-multipart':
try:
content_type, body = prepare_multipart_no_base64(body)
except (TypeError, ValueError) as e:
BitwardenException('failed to parse body as form-multipart: %s' % to_native(e))
headers['Content-Type'] = content_type
# mostly taken from ansible.builtin.url lookup plugin
try:
response = open_url(url, method=method, data=body, headers=headers)
except HTTPError as e:
raise BitwardenException("Received HTTP error for %s : %s" % (url, to_native(e)))
except URLError as e:
raise BitwardenException("Failed lookup url for %s : %s" % (url, to_native(e)))
except SSLValidationError as e:
raise BitwardenException("Error validating the server's certificate for %s: %s" % (url, to_native(e)))
except ConnectionError as e:
raise BitwardenException("Error connecting to %s: %s" % (url, to_native(e)))
try:
result = json.loads(to_text(response.read()))
except json.decoder.JSONDecodeError as e:
raise BitwardenException('Unable to load JSON: %s' % (to_native(e)))
if not result.get('success'):
raise BitwardenException('API call failed: %s' % (result.get('data')))
return result
def is_unlocked(self):
"""Check if the Bitwarden vault is unlocked.
"""
result = self._api_call('status')
return result['data']['template']['status'] == 'unlocked'
def sync(self):
"""Pull the latest vault data from server.
"""
return self._api_call('sync', method='POST')
def get_items(self, name, username=None, folder_id=None, collection_id=None, organisation_id=None):
"""Search for items in Bitwarden. Returns a list of the items that *exactly* matches all the parameters.
A complete object:
{
"object": "item",
"id": "60020baa-e876-4fd4-b5bc-259b5e6389a8",
"organizationId": "44906ecb-b307-47a5-92b4-a097745592ed",
"folderId": null,
"type": 1,
"reprompt": 0,
"name": "myhost - purpose",
"notes": "Generated by Ansible.",
"favorite": false,
"login": {
"uris": [
{
"match": null,
"uri": "https://www.example.com"
}
],
"username": "username",
"password": "password",
"totp": null,
"passwordRevisionDate": null
},
"collectionIds": [
"153e991a-a56f-4e5d-9dea-c13b9e693fc4"
],
"revisionDate": "2022-06-26T06:00:00.000Z"
}
"""
params = urllib.parse.urlencode(
{
'search': name,
},
quote_via=urllib.parse.quote,
)
result = self._api_call('list/object/items?%s' % (params))
# make sure that all the given parameters exactly match the requested one, as `bw` is not that precise (only performs a search)
# we are not using the filter parameters of the `bw` utility, as they perform an OR operation, but we want AND
matching_items = []
for item in result['data']['data']:
if item['name'] == name \
and (item['login']['username'] == username) \
and (item['folderId'] == folder_id) \
and (
# cover case if collectionIds is an empty list
(collection_id is None and not item.get('collectionIds')) \
or \
(collection_id in item.get('collectionIds', [])) \
) \
and (item['organizationId'] == organisation_id):
matching_items.append(item)
return matching_items
def get_all_items(self):
"""Get a list with all item from Bitwarden.
"""
result = self._api_call('list/object/items')
return result['data']['data']
def get_item_by_id(self, item_id):
"""Get an item by ID from Bitwarden. Returns the item or None. Throws an exception if the id leads to unambiguous results.
"""
result = self._api_call('object/item/%s' % (item_id))
return result['data']
def get_collections(self):
"""Get a list with all collections from Bitwarden.
"""
result = self._api_call('list/object/collections')
return result['data']['data']
def get_folders(self):
"""Get a list with all folders from Bitwarden.
"""
result = self._api_call('list/object/folders')
return result['data']['data']
def generate(self, password_length=40,
password_uppercase=True, password_lowercase=True,
password_numeric=True, password_special=False):
"""Return a generated password.
"""
params = 'length=%i%s%s%s%s' % (
password_length,
'&uppercase' if password_uppercase else '',
'&lowercase' if password_lowercase else '',
'&number' if password_numeric else '',
'&special' if password_special else '',
)
result = self._api_call('generate?%s' % (params))
return result['data']['data']
def get_template_item_login_uri(self, uris):
"""Get an item.login.uri object from the vault.
A complete object:
{
"match": null,
"uri": "https://google.com"
}
"""
login_uris = []
if uris:
# To create uris, fetch the JSON structure for that.
result = self._api_call('object/template/item.login.uri')
template = result['data']['template']
for uri in uris:
login_uri = template.copy() # make sure we are not editing the same object repeatedly
login_uri['uri'] = uri
login_uris.append(login_uri)
return login_uris
def get_template_item_login(self, username=None, password=None, login_uris=None):
"""Get an item.login object from the vault.
A complete object:
{
"uris": [],
"username": "jdoe",
"password": "myp@ssword123",
"totp": "JBSWY3DPEHPK3PXP"
}
"""
# To create a login item, fetch the JSON structure for that.
result = self._api_call('object/template/item.login')
login = result['data']['template']
login['password'] = password
login['totp'] = ''
login['uris'] = login_uris or []
login['username'] = username
return login
def get_template_item(self, name, login=None, notes=None, organization_id=None, collection_ids=None, folder_id=None):
"""Get an item.login object from the vault.
A complete item object:
{
"organizationId": null,
"collectionIds": null,
"folderId": null,
"type": 1,
"name": "Item name",
"notes": "Some notes about this item.",
"favorite": false,
"fields": [],
"login": null,
"secureNote": null,
"card": null,
"identity": null,
"reprompt": 0
}
"""
# To create an item later on, fetch the item JSON structure, and fill in the appropriate
# values.
result = self._api_call('object/template/item')
item = result['data']['template']
item['collectionIds'] = collection_ids
item['folderId'] = folder_id
item['login'] = login
item['name'] = name
item['notes'] = notes
item['organizationId'] = organization_id
return item
def create_item(self, item):
"""Creates an item object in Bitwarden.
"""
result = self._api_call('object/item', method='POST', body=item)
return result['data']
def edit_item(self, item, item_id):
"""Edits an item object in Bitwarden.
"""
result = self._api_call('object/item/%s' % (item_id), method='PUT', body=item)
return result['data']
def add_attachment(self, item_id, attachment_path):
"""Adds the file at `attachment_path` to the item specified by `item_id`
"""
body = {
'file': {
'filename': attachment_path,
},
}
result = self._api_call('attachment?itemId=%s' % (item_id), method='POST', body=body, body_format='form-multipart')
return result
@staticmethod
def get_pretty_name(name, hostname=None, purpose=None):
"""create a nice name for the item if none is given
schemes:
* hostname - purpose (for example "app4711 - MariaDB")
* hostname (for example "app4711")
"""
if not name:
name = hostname
if purpose:
name += ' - {}'.format(purpose)
return name

85
src/bitwarden_api/mod.rs Normal file
View File

@ -0,0 +1,85 @@
pub mod model;
// https://bitwarden.com/help/vault-management-api/
pub fn sync() -> Result<model::response::Sync, Box<dyn std::error::Error>> {
let client = reqwest::blocking::Client::new();
let response = client.post("http://localhost:8087/sync")
.send()?
.json::<model::response::Sync>()?;
if response.success {
Ok(response)
} else {
Err("success != true in JSON response".into())
}
}
pub fn status() -> Result<model::response::Status, Box<dyn std::error::Error>> {
let client = reqwest::blocking::Client::new();
let response = client.get("http://localhost:8087/status")
.send()?
.json::<model::response::Status>()?;
if response.success {
Ok(response)
} else {
Err("success != true in JSON response".into())
}
}
pub fn list_object_items() -> Result<model::response::ListObjectItems, Box<dyn std::error::Error>> {
let client = reqwest::blocking::Client::new();
let response = client.get("http://localhost:8087/list/object/items")
// let response = client.get("http://localhost:8087/list/object/items?folderId=50d2f507-8f0a-416c-80a4-afb100a77700")
.send()?;
// dbg!(response.text());
// panic!();
let json = response.json::<model::response::ListObjectItems>()?;
if json.success {
Ok(json)
} else {
Err("success != true in JSON response".into())
}
}
pub fn list_object_collections() -> Result<model::response::ListObjectCollections, Box<dyn std::error::Error>> {
let client = reqwest::blocking::Client::new();
let response = client.get("http://localhost:8087/list/object/collections")
.send()?
.json::<model::response::ListObjectCollections>()?;
if response.success {
Ok(response)
} else {
Err("success != true in JSON response".into())
}
}
pub fn list_object_folders() -> Result<model::response::ListObjectFolders, Box<dyn std::error::Error>> {
let client = reqwest::blocking::Client::new();
let response = client.get("http://localhost:8087/list/object/folders")
.send()?
.json::<model::response::ListObjectFolders>()?;
if response.success {
Ok(response)
} else {
Err("success != true in JSON response".into())
}
}
pub fn object_item(id: &String) -> Result<model::response::ObjectItem, Box<dyn std::error::Error>> {
let client = reqwest::blocking::Client::new();
let response = client.get(format!("http://localhost:8087/object/item/{}", id))
.send()?
.json::<model::response::ObjectItem>()?;
if response.success {
Ok(response)
} else {
Err("success != true in JSON response".into())
}
}

View File

@ -0,0 +1,2 @@
#[allow(non_snake_case)]
pub mod response;

View File

@ -0,0 +1,125 @@
use serde::Deserialize;
// TODO: either use rbw-agent as the backend, or adjust structs to rbw db.rs & api.rs
#[derive(Debug, Deserialize)]
pub struct Sync {
pub success: bool,
}
#[derive(Debug, Deserialize)]
pub struct Status {
pub success: bool,
pub data: StatusData,
}
#[derive(Debug, Deserialize)]
pub struct StatusData {
pub object: String,
pub template: StatusDataTemplate,
}
#[derive(Debug, Deserialize)]
pub struct StatusDataTemplate {
pub serverUrl: String,
pub lastSync: String,
pub userEmail: String,
pub userId: String,
pub status: String,
}
#[derive(Debug, Deserialize)]
pub struct ListObjectItems {
pub success: bool,
pub data: ListObjectItemsData,
}
#[derive(Debug, Deserialize)]
pub struct ListObjectItemsData {
pub object: String,
pub data: Vec<ListObjectItemsDataData>,
}
#[derive(Debug, Deserialize)]
pub struct ListObjectItemsDataData {
pub object: String,
pub id: String,
pub organizationId: Option<String>,
pub folderId: Option<String>,
#[serde(rename="type")]
pub _type: i32,
pub reprompt: i32,
pub name: String,
pub notes: Option<String>,
pub favorite: bool,
pub login: Option<ListObjectItemsDataLogin>,
pub collectionIds: Vec<String>,
pub revisionDate: String,
}
#[derive(Debug, Deserialize)]
pub struct ListObjectItemsDataLogin {
pub uris: Option<Vec<ListObjectItemsDataLoginURIs>>,
pub username: Option<String>,
pub password: Option<String>,
pub totp: Option<String>,
pub passwordRevisionDate: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct ListObjectItemsDataLoginURIs {
#[serde(rename="match")]
pub _match: Option<i32>,
pub uri: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct ListObjectCollections {
pub success: bool,
pub data: ListObjectCollectionsData,
}
#[derive(Debug, Deserialize)]
pub struct ListObjectCollectionsData {
pub object: String,
pub data: Vec<ListObjectCollectionsDataData>,
}
#[derive(Debug, Deserialize)]
pub struct ListObjectCollectionsDataData {
pub object: String,
pub id: String,
pub organizationId: String,
pub name: String,
pub externalId: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct ListObjectFolders {
pub success: bool,
pub data: ListObjectFoldersData,
}
#[derive(Debug, Deserialize)]
pub struct ListObjectFoldersData {
pub object: String,
pub data: Vec<ListObjectFoldersDataData>,
}
#[derive(Debug, Deserialize)]
pub struct ListObjectFoldersDataData {
pub object: String,
pub id: Option<String>,
pub name: String,
}
#[derive(Debug, Deserialize)]
pub struct ObjectItem {
pub success: bool,
pub data: ListObjectItemsDataData,
}

View File

@ -1,394 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author: Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
# https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.
"""See the README for more details.
"""
import argparse # pylint: disable=C0413
import json
import logging
import os
import sys
import pyperclip
import bitwarden # pylint: disable=C0413
import lib.base # pylint: disable=C0413
import lib.shell # pylint: disable=C0413
__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2024042101'
DESCRIPTION = """todo"""
def parse_args():
"""Parse command line arguments using argparse.
"""
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument(
'-V', '--version',
action='version',
version='%(prog)s: v{} by {}'.format(__version__, __author__)
)
# Define subcommands
subparsers = parser.add_subparsers(dest='COMMAND', required=True)
# Select command
select_parser = subparsers.add_parser('select', help='Select command')
# we do not actually need the title, the variable is never used
# however, rofi always calls the given executable with the title of the item,
# so we need to accept it to prevent an argparse "unrecognized arguments" error
select_parser.add_argument(
help='Internal use only',
dest='TITLE',
nargs='?',
)
# History command with nested subcommands
history_parser = subparsers.add_parser('history', help='History related commands')
history_subparsers = history_parser.add_subparsers(
help='History subcommands',
dest='HISTORY_COMMAND',
required=True,
)
get_parser = history_subparsers.add_parser('get', help='Get a specific item from history')
get_parser.add_argument(
dest='HISTORY_GET_INDEX',
metavar='index',
type=int,
help='Index of the item in history',
)
get_parser.add_argument(
dest='HISTORY_GET_FIELD',
metavar='field',
type=str,
help='Field to retrieve',
)
get_parser.add_argument(
'--print',
help='Print the field instead of saving it to the clipboard.',
dest='HISTORY_GET_PRINT',
action='store_true',
default=False,
)
return parser.parse_args()
def get_history():
file_path = '/tmp/bw-menu-history.json' # TODO: use XDG paths
try:
with open(file_path, 'r', encoding='UTF-8') as file:
content = file.read()
except FileNotFoundError:
# Return an empty list if the file cannot be opened
return []
try:
history = json.loads(content)
return history['history']
except json.JSONDecodeError:
# Return an empty list if deserialization fails
return []
def save_history(entries):
history = {'history': entries}
json_str = json.dumps(history)
file_path = '/tmp/bw-menu-history.json' # TODO: use XDG paths
# Write the JSON string to a file, overwriting it if it already exists
with open(file_path, 'w', encoding='UTF-8') as file:
file.write(json_str)
MAX_HISTORY_LEN = 5 # TODO: make configurable
def add_to_history(entry):
history = get_history()
history.insert(0, entry)
if len(history) > MAX_HISTORY_LEN:
history = history[:MAX_HISTORY_LEN]
save_history(history)
def save_to_clipboard(string):
pyperclip.copy(string)
def get_title(item, folders, collections):
title = ''
folder_id = item.get('folderId')
if folder_id:
title += f'{folders[folder_id]}/'
# TODO: how do we handle multiple collections nicely? i dont like the pipe for searching. my
# fav solution right now is to display the entry multiple times, eg "000/000-p-db01", "001/000-p-db01"
# get_title should return a list of titles which match the current item
collection_ids = item.get('collectionIds', [])
for collection_id in collection_ids:
title += collections[collection_id]
if collection_ids:
title += '/'
if not title:
title += 'ungrouped/'
title += item['name']
username = item['login'].get('username')
if username:
title += f' ({username})'
return title
def generate_rofi_list(logins):
print('\0prompt\x1fSelect item')
print('\0message\x1fReturn | Copy password\rShift+Return | Copy username or selected field\rControl+Return | Show item')
print('\0use-hot-keys\x1ftrue')
print('\0no-custom\x1ftrue')
print('\0markup-rows\x1ftrue')
for title in sorted(logins.keys()):
print(title, end='')
icon = 'folder' # TODO: what icons are available? ~/.icons seems to work
print(f'\0icon\x1f{icon}\x1finfo\x1f{logins[title]["id"]}')
def get_bw_collection_id2name_map():
logger = logging.getLogger()
bw = bitwarden.Bitwarden()
try:
collections = bw.get_collections()
# convert to lookup dictionary
collections = {item['id']: item['name'] for item in collections}
logger.debug('Got list of Bitwarden collections.')
except Exception:
logger.exception('Failed to get list of Bitwarden collections.')
sys.exit(1)
return collections
def get_bw_folders_id2name_map():
logger = logging.getLogger()
bw = bitwarden.Bitwarden()
try:
folders = bw.get_folders()
# convert to lookup dictionary
folders = {item['id']: item['name'] for item in folders}
logger.debug('Got list of Bitwarden folders.')
except Exception:
logger.exception('Failed to get list of Bitwarden folders.')
sys.exit(1)
return folders
def get_bitwarden_logins():
logger = logging.getLogger()
bw = bitwarden.Bitwarden()
if not bw.is_unlocked:
logger.error('Not logged into Bitwarden, or Bitwarden Vault is locked. Please run `bw login` and `bw unlock` first.')
sys.exit(1)
try:
# TODO: add a timeout for running the sync?
bw.sync()
logger.debug('Synchronised the Bitwarden Vault.')
except Exception:
logger.exception('Failed to sync the Bitwarden Vault.')
sys.exit(1)
try:
items = bw.get_all_items()
# we only want items which have a login
items = [item for item in items if 'login' in item]
logger.debug('Got list of Bitwarden items.')
except Exception:
logger.exception('Failed to get list of Bitwarden items.')
sys.exit(1)
collections = get_bw_collection_id2name_map()
folders = get_bw_folders_id2name_map()
logins = {}
for item in items:
title = get_title(item, folders, collections)
logins[title] = item
return logins
# the flow is:
# 1. bw-menu starts rofi in the script mode
# 2. rofi calls bw-menu calls bw-menu to get a list of items to display
# 3. the user interacts with the list
# 4. rofi sets environment variables, and calls bw-menu with the title of the selected item
# 5. bw-menu acts based on the environment variables
#
# currently using the script mode so that we can pass additional info (eg the id).
# no other mode supports this, meaning we would need to match based on the title
def handle_select():
logger = logging.getLogger()
rofi_retv = os.environ.get('ROFI_RETV', None)
rofi_info = os.environ.get('ROFI_INFO', None)
logger.debug(f'ROFI_RETV: {rofi_retv}')
logger.debug(f'ROFI_INFO: {rofi_info}')
if rofi_retv is None:
# this means rofi is not running yet, so let's start it
logger.debug('Starting initial rofi process')
# we need to remap accept-alt and accept-custom so that we can use the keys for custom binds
cmd = f'''
rofi
-show bw
-modes 'bw:{sys.argv[0]} select'
-matching fuzzy
-sorting-method fzf
-sort
-kb-accept-alt F12
-kb-accept-custom F11
-kb-custom-1 Shift+Return
-kb-custom-2 Control+Return
'''
cmd = cmd.replace('\n', ' ')
logger.debug(f'Running {cmd}')
_, stderr, retc = lib.base.coe(lib.shell.shell_exec(cmd))
if retc != 0 or stderr: # TODO: better error handling
logger.error(f'retc: {retc}, stderr: {stderr}')
sys.exit(1)
else:
# this means we are being called by rofi as a custom script
# either generate the list, or act on input
rofi_retv = int(rofi_retv)
# 0: Initial call of script
if rofi_retv == 0:
generate_rofi_list(get_bitwarden_logins())
else:
try:
bw = bitwarden.Bitwarden()
item = bw.get_item_by_id(rofi_info)
except Exception:
logger.exception(f'Failed to get Bitwarden item with id {rofi_info}')
sys.exit(1)
add_to_history(rofi_info)
# 1: Selected an entry
if rofi_retv == 1:
logger.info('Copying password')
password = item['login']['password']
save_to_clipboard(password)
# 10-28: Custom keybinding 1-19
elif rofi_retv == 10:
logger.info('Copying username')
username = item['login']['username']
save_to_clipboard(username)
elif rofi_retv == 11:
logger.info('Showing item')
print('\0prompt\x1fSelect field')
print('\0message\x1fReturn | Copy field')
print('{}\0icon\x1f{}\x1finfo\x1f{}', 'TODO', 'folder', 'single-item:todo')
# TODO: not sure how to proceed here. currently pressing return leads to the "copy password" action
# one option is to pass the action in the info and parsing that
else:
print(f'Unknown rofi_retv: {rofi_retv}')
sys.exit(2)
sys.exit(0)
def handle_history_get(index, field, print_result=False):
logger = logging.getLogger()
history = get_history()
if index >= len(history):
print(f'Trying to get history element {index}, but the history only contains {len(history)} elements')
sys.exit(1)
item_id = history[index]
try:
bw = bitwarden.Bitwarden()
item = bw.get_item_by_id(item_id)
except Exception:
logger.exception(f'Failed to get Bitwarden item with id "{item_id}". It might not exist anymore.')
sys.exit(1)
login = item.get('login')
if login is None:
logger.error(f'The Bitwarden item with id "{item_id}" does not have a login attribute.')
sys.exit(2)
result = None
if field == 'title':
collections = get_bw_collection_id2name_map()
folders = get_bw_folders_id2name_map()
title = get_title(item, folders, collections)
result = title
elif field in login:
result = login[field]
else:
print(f"Could not find a field named '{field}'")
sys.exit(1)
if print_result:
print(result)
else:
save_to_clipboard(result)
def main():
"""The main function. Hier spielt die Musik.
"""
logging_format = "[%(filename)s->%(funcName)s:%(lineno)s] %(message)s"
logging.basicConfig(
format=logging_format,
# level=logging.DEBUG,
)
logger = logging.getLogger()
logger.debug(sys.argv)
args = parse_args()
if args.COMMAND == 'select':
handle_select()
elif args.COMMAND == 'history':
if args.HISTORY_COMMAND == 'get':
handle_history_get(args.HISTORY_GET_INDEX, args.HISTORY_GET_FIELD, args.HISTORY_GET_PRINT)
if __name__ == '__main__':
main()

531
src/main.rs Normal file
View File

@ -0,0 +1,531 @@
use clap::{Parser, Subcommand};
use itertools::Itertools;
use log::{debug, error, info};
use serde::{Serialize, Deserialize};
use std::env;
use std::fs;
use std::io::Read;
use std::io::Write;
use std::process::Stdio;
use std::{process, collections::HashMap};
use crate::bitwarden_api::model;
mod bitwarden_api;
// GOALS:
// * `search` item via dmenu / rofi / cli
// * `search --mark` parameter that remembers the marked entry
// * `mark get {field}` returns the field of the marked element (id, title, folder, username, password, etc)
// * "scroll" through marked history, mark as active again
// * automatically execute `export BW_SESSION="$(bw unlock --raw)" && bw serve --hostname 127.0.0.1&` if required
// * unlock with system keyring if possible
// * autotype?
#[derive(Parser)]
#[command(author, version, about, long_about = None)] // Read from `Cargo.toml`
struct Cli {
#[command(subcommand)]
command: CliCommands,
}
#[derive(Subcommand)]
enum CliCommands {
Select {
title: Option<String>,
},
History {
#[command(subcommand)]
command: HistoryCommands
},
}
#[derive(Subcommand)]
enum HistoryCommands {
List {},
Get {
index: usize,
field: String,
},
}
#[derive(Serialize, Deserialize)]
struct History {
history: Vec<String>,
}
fn main() {
env_logger::init();
let cli = Cli::parse();
match &cli.command {
CliCommands::Select{title: _} => {
// the title is always ignored, we want the id which we get via the environment variables
debug!("Running in select mode");
handle_select();
}
CliCommands::History { command } => match &command {
HistoryCommands::List {} => {
debug!("Running history list");
handle_history_list();
}
HistoryCommands::Get {index, field} => {
debug!("Running history get");
handle_history_get(*index, field);
}
}
}
}
fn get_history() -> Vec<String> {
let file_path = "/tmp/bw-menu-history.json"; // TODO: use XDG paths
let read_file = fs::OpenOptions::new()
.read(true)
.open(file_path);
let json_str = match read_file {
Ok(mut file) => {
let mut content = String::new();
file.read_to_string(&mut content).expect("Failed to read file");
content
}
Err(_) => String::new(), // Return an empty string if the file cannot be opened
};
// Deserialize JSON string back into a Rust struct
let history: History = serde_json::from_str(&json_str).unwrap_or_else(|_| {
// Return a default Person with an empty history if deserialization fails
History {
history: Vec::new(),
}
});
history.history
}
fn save_history(entries: Vec<String>) {
let history = History {
history: entries,
};
let json_str = serde_json::to_string(&history).expect("Failed to serialize to JSON!");
let file_path = "/tmp/bw-menu-history.json"; // TODO: use XDG paths
// Write the JSON string to a file, overwriting it if it already exists
let mut file = fs::OpenOptions::new()
.write(true)
.truncate(true) // Ensure the file is truncated before writing
.create(true)
.open(file_path)
.expect("Failed to open file for writing!");
file.write_all(json_str.as_bytes()).expect("Failed to write to file!");
}
const MAX_HISTORY_LEN: usize = 5; // TODO: make configurable
fn add_history(entry: &String) {
let mut history = get_history();
history.insert(0, entry.to_string());
if history.len() > MAX_HISTORY_LEN {
history = history[0..MAX_HISTORY_LEN].to_vec();
}
save_history(history);
}
// the flow is:
// 1. bw-menu starts rofi in the script mode
// 2. rofi calls bw-menu calls bw-menu to get a list of items to display
// 3. the user interacts with the list
// 4. rofi sets environment variables, and calls bw-menu with the title of the selected item
// 5. bw-menu acts based on the environment variables
//
// currently using the script mode so that we can pass additional info (eg the id).
// no other mode supports this, meaning we would need to match based on the title
fn handle_select() {
let rofi_retv = env::var("ROFI_RETV");
let rofi_info = env::var("ROFI_INFO");
dbg!(&rofi_retv);
dbg!(&rofi_info);
let executable_path = env::current_exe().unwrap_or_else(|err| {
error!("Failed to get the path to the executable: {:?}", err);
process::exit(2);
});
match rofi_retv {
Err(_) => {
// this means rofi is not running yet, so let's start it
debug!("starting initial rofi process");
let _ = process::Command::new("rofi")
.arg("-show")
.arg("bw")
.arg("-modes")
.arg(format!("bw:{} select", &executable_path.to_string_lossy()))
.arg("-matching")
.arg("fuzzy")
.arg("-sorting-method")
.arg("fzf")
.arg("-sort")
// we need to remap some default so that we can use the keys for custom binds
.arg("-kb-accept-alt")
.arg("F12")
.arg("-kb-accept-custom")
.arg("F11")
// set our custom keybinds
.arg("-kb-custom-1")
.arg("Shift+Return")
.arg("-kb-custom-2")
.arg("Control+Return")
.spawn()
.expect("Failed to spawn child process"); // TODO: better error handling
}
Ok(_) => {
// this means we are being called by rofi as a custom script
// either generate the list, or act on input
let rofi_retv = rofi_retv.unwrap_or_default().to_string().parse::<i32>().unwrap_or(0);
let rofi_info = rofi_info.unwrap_or_default();
if rofi_retv == 0 {
// 0: Initial call of script
generate_rofi_list(get_bitwarden_logins());
} else {
let item = match crate::bitwarden_api::object_item(&rofi_info) {
Ok(response) => response,
Err(msg) => {
error!("Failed to get Bitwarden item with id {}:\n{}", rofi_info, msg);
process::exit(1);
}
};
add_history(&rofi_info);
// 1: Selected an entry
if rofi_retv == 1 {
info!("copy password");
if let Some(login) = item.data.login {
if let Some(password) = login.password {
save_to_clipboard(password);
}
}
// 10-28: Custom keybinding 1-19
} else if rofi_retv == 10 {
info!("copy username");
if let Some(login) = item.data.login {
if let Some(username) = login.username {
save_to_clipboard(username);
}
}
} else if rofi_retv == 11 {
info!("show item");
println!("\0prompt\x1fSelect field");
println!("\0message\x1fReturn | Copy field");
println!("{}\0icon\x1f{}\x1finfo\x1f{}", "TODO", "folder", "single-item:todo");
// TODO: not sure how to proceed here. currently pressing return leads to the "copy password" action
// one option is to pass the action in the info and parsing that
// if let Some(login) = item.data.login {
// if let Some(username) = login.username {
// println!("Username: {}", username);
// }
// }
}
// we are done here
process::exit(0);
}
}
}
}
fn get_bitwarden_logins() -> HashMap<String, model::response::ListObjectItemsDataData> {
match crate::bitwarden_api::status() {
Ok(response) => {
if response.data.template.status != "unlocked" {
error!("Vault is {}, should be unlocked!", response.data.template.status);
process::exit(1);
}
}
Err(msg) => {
error!("Failed to get Bitwarden status:\n{msg}");
process::exit(1);
}
}
match crate::bitwarden_api::sync() {
Ok(_) => info!("Sync'd Bitwarden Vault."),
Err(msg) => {
error!("Failed to sync Bitwarden Vault:\n{msg}");
process::exit(1);
}
}
let items = match crate::bitwarden_api::list_object_items() {
Ok(response) => {
info!("Got list of Bitwarden items.");
response.data.data
}
Err(msg) => {
error!("Failed to get list of Bitwarden items:\n{msg}");
process::exit(1);
}
};
let collections: HashMap<String, String> = match crate::bitwarden_api::list_object_collections() {
Ok(response) => {
info!("Got list of Bitwarden collections.");
response.data.data.iter()
.map(|item| (item.id.clone(), item.name.clone()))
.collect()
}
Err(msg) => {
error!("Failed to get list of Bitwarden collections:\n{msg}");
process::exit(1);
}
};
let folders: HashMap<String, String> = match crate::bitwarden_api::list_object_folders() {
Ok(response) => {
info!("Got list of Bitwarden folders.");
response.data.data.iter()
.filter(|item| item.id.is_some())
.map(|item| (item.id.as_ref().unwrap().clone(), item.name.clone()))
.collect()
}
Err(msg) => {
error!("Failed to get list of Bitwarden folders:\n{msg}");
process::exit(1);
}
};
let mut logins: HashMap<String, model::response::ListObjectItemsDataData> = HashMap::new();
for item in items {
if item.login.is_some() {
let title = get_title(&item, &folders, &collections);
debug!("Login: {}", title);
logins.insert(title, item);
}
// break;
}
// dbg!(&logins);
logins
}
fn generate_rofi_list(logins: HashMap<String, model::response::ListObjectItemsDataData>) {
println!("\0prompt\x1fSelect item");
println!("\0message\x1fReturn | Copy password\rShift+Return | Copy username or selected field\rControl+Return | Show item");
println!("\0use-hot-keys\x1ftrue");
println!("\0no-custom\x1ftrue");
println!("\0markup-rows\x1ftrue");
for title in logins.keys().sorted() {
print!("{title}");
let icon = "folder"; // TODO: what icons are available? ~/.icons seems to work
println!("\0icon\x1f{}\x1finfo\x1f{}", icon, logins[title].id);
}
}
fn get_title(item: &model::response::ListObjectItemsDataData, folders: &HashMap<String, String>, collections: &HashMap<String, String>) -> String {
let mut title = String::new();
let login = item.login.as_ref().unwrap();
if item.folderId.is_some() {
title.push_str(&format!("{}/", folders[item.folderId.as_ref().unwrap()]));
}
// TODO: how do we handle multiple collections nicely? i dont like the pipe for searching. my
// fav solution right now is to display the entry multiple times, eg "000/000-p-xca01", "001/000-p-xca01"
for collection_id in &item.collectionIds {
title.push_str(&collections[collection_id]);
}
if !item.collectionIds.is_empty() {
title.push('/');
}
if title.is_empty() {
title.push_str("ungrouped/");
}
title.push_str(&item.name);
if login.username.is_some() {
title.push_str(&format!(" ({})", login.username.as_ref().unwrap()));
}
title
}
fn save_to_clipboard(input: String) {
// using xclip blocks the app. maybe piping the output to /dev/null would fix this
// let mut cmd = process::Command::new("xclip")
// .arg("-in")
// .arg("-selection")
// .arg("clipboard")
// .stdin(Stdio::piped())
// .spawn()
// .expect("Failed to spawn child process");
let mut cmd = process::Command::new("xsel")
.arg("--input")
.arg("--clipboard")
.stdin(Stdio::piped())
.spawn()
.expect("Failed to spawn child process"); // TODO: better error handling
let mut stdin = cmd.stdin.take().expect("Failed to open stdin.");
let child_thread = std::thread::spawn(move || {
stdin.write_all(input.as_bytes()).expect("Failed to write to stdin");
});
let result = child_thread.join();
match result {
Ok(_) => debug!("Child thread completed successfully"),
Err(e) => error!("Child thread encountered an error: {:?}", e),
}
}
fn handle_history_list() {
// TODO: not sure if we need this, would make more sense to just show these entries at the top
// of the normal selector
let history = get_history();
let collections: HashMap<String, String> = match crate::bitwarden_api::list_object_collections() {
Ok(response) => {
info!("Got list of Bitwarden collections.");
response.data.data.iter()
.map(|item| (item.id.clone(), item.name.clone()))
.collect()
}
Err(msg) => {
error!("Failed to get list of Bitwarden collections:\n{msg}");
process::exit(1);
}
};
let folders: HashMap<String, String> = match crate::bitwarden_api::list_object_folders() {
Ok(response) => {
info!("Got list of Bitwarden folders.");
response.data.data.iter()
.filter(|item| item.id.is_some())
.map(|item| (item.id.as_ref().unwrap().clone(), item.name.clone()))
.collect()
}
Err(msg) => {
error!("Failed to get list of Bitwarden folders:\n{msg}");
process::exit(1);
}
};
let mut logins: HashMap<String, model::response::ListObjectItemsDataData> = HashMap::new();
for id in history {
dbg!(&id);
let item = match crate::bitwarden_api::object_item(&id) {
Ok(response) => response.data,
Err(msg) => {
error!("Failed to get Bitwarden item with id {}. It might not exist anymore.\n{}", id, msg);
continue;
}
};
if item.login.is_some() {
let title = get_title(&item, &folders, &collections);
debug!("Login: {}", title);
logins.insert(title, item);
}
}
dbg!(&logins);
}
fn handle_history_get(index: usize, field: &String) {
let history = get_history();
if index >= history.len() {
eprintln!("Trying to get history element {}, but the history only contains {} elements", index, history.len());
process::exit(1);
}
let id = &history[index];
let item = match crate::bitwarden_api::object_item(&id) {
Ok(response) => response.data,
Err(msg) => {
error!("Failed to get Bitwarden item with id {}. It might not exist anymore.\n{}", id, msg);
process::exit(1);
}
};
// dbg!(&item.login);
// dbg!(&field);
match item.login {
Some(ref login) => {
match field.as_ref() {
"uris" => {
if let Some(uris) = &login.uris {
println!("{:?}", uris);
}
}
"username" => {
if let Some(username) = &login.username {
println!("{}", username);
}
}
"password" => {
if let Some(password) = &login.password {
println!("{}", password);
}
}
"totp" => {
if let Some(totp) = &login.totp {
println!("{}", totp);
}
}
"title" => {
let collections: HashMap<String, String> = match crate::bitwarden_api::list_object_collections() {
Ok(response) => {
info!("Got list of Bitwarden collections.");
response.data.data.iter()
.map(|item| (item.id.clone(), item.name.clone()))
.collect()
}
Err(msg) => {
error!("Failed to get list of Bitwarden collections:\n{msg}");
process::exit(1);
}
};
let folders: HashMap<String, String> = match crate::bitwarden_api::list_object_folders() {
Ok(response) => {
info!("Got list of Bitwarden folders.");
response.data.data.iter()
.filter(|item| item.id.is_some())
.map(|item| (item.id.as_ref().unwrap().clone(), item.name.clone()))
.collect()
}
Err(msg) => {
error!("Failed to get list of Bitwarden folders:\n{msg}");
process::exit(1);
}
};
let title = get_title(&item, &folders, &collections);
println!("{}", title);
}
_ => {
eprintln!("Could not find a field named '{}'", field);
process::exit(1);
}
}
}
None => {
error!("Login of Bitwarden item is none");
process::exit(2);
}
}
}