/ lookup_plugins / bitwarden.py
bitwarden.py
1 #!/usr/bin/env python 2 3 # (c) 2018, Matt Stofko <matt@mjslabs.com> 4 # GNU General Public License v3.0+ (see LICENSE or 5 # https://www.gnu.org/licenses/gpl-3.0.txt) 6 # 7 # This plugin can be run directly by specifying the field followed by a list of 8 # entries, e.g. bitwarden.py password google.com wufoo.com 9 # 10 # This version includes fixes that can be found in this fork: 11 # https://github.com/status-im/ansible-modules-bitwarden 12 # 13 from __future__ import (absolute_import, division, print_function) 14 __metaclass__ = type 15 16 import json 17 import os 18 import sys 19 20 from shutil import which 21 from subprocess import Popen, PIPE, STDOUT, check_output 22 23 from ansible.errors import AnsibleError 24 from ansible.plugins.lookup import LookupBase 25 26 try: 27 from __main__ import display 28 except ImportError: 29 from ansible.utils.display import Display 30 display = Display() 31 32 33 DOCUMENTATION = """ 34 lookup: bitwarden 35 author: 36 - Matt Stofko <matt@mjslabs.com> 37 requirements: 38 - bw (command line utility) 39 - BW_SESSION environment var (from `bw login` or `bw unlock`) 40 short_description: look up data from a bitwarden vault 41 description: 42 - use the bw command line utility to grab one or more items stored in a 43 bitwarden vault 44 options: 45 _terms: 46 description: name of item that contains the field to fetch 47 required: true 48 field: 49 description: field to return from bitwarden 50 default: 'password' 51 sync: 52 description: If True, call `bw sync` before lookup 53 """ 54 55 EXAMPLES = """ 56 - name: get 'username' from Bitwarden entry 'Google' 57 debug: 58 msg: "{{ lookup('bitwarden', 'Google', field='username') }}" 59 """ 60 61 RETURN = """ 62 _raw: 63 description: 64 - Items from Bitwarden vault 65 """ 66 67 68 class Bitwarden(object): 69 def __init__(self, path): 70 self._cli_path = path 71 self._bw_session = "" 72 if which("bw") is None: 73 raise AnsibleError("Command not found: {0}".format(self._cli_path)) 74 75 @property 76 def session(self): 77 return self._bw_session 78 79 @session.setter 80 def session(self, value): 81 self._bw_session = value 82 83 @property 84 def cli_path(self): 85 return self._cli_path 86 87 def _run(self, args): 88 my_env = os.environ.copy() 89 if self.session != "": 90 my_env["BW_SESSION"] = self.session 91 p = Popen([self.cli_path] + args, stdin=PIPE, 92 stdout=PIPE, stderr=STDOUT, env=my_env) 93 out, _ = p.communicate() 94 out = out.decode() 95 rc = p.wait() 96 if rc != 0: 97 display.debug("Received error when running '{0} {1}': {2}" 98 .format(self.cli_path, args, out)) 99 if out.startswith("Vault is locked."): 100 raise AnsibleError("Error accessing Bitwarden vault. " 101 "Run 'bw unlock' to unlock the vault.") 102 elif out.startswith("You are not logged in."): 103 raise AnsibleError("Error accessing Bitwarden vault. " 104 "Run 'bw login' to login.") 105 elif out.startswith("Failed to decrypt."): 106 raise AnsibleError("Error accessing Bitwarden vault. " 107 "Make sure BW_SESSION is set properly.") 108 elif out.startswith("More than one result was found."): 109 raise AnsibleError("More than one object found with this name.") 110 elif out.startswith("Not found."): 111 raise AnsibleError("Error accessing Bitwarden vault. " 112 "Specified item not found: {}".format(args[-1])) 113 else: 114 print("Unknown failure in 'bw' command: \n%s" % out) 115 return None 116 return out.strip() 117 118 def sync(self): 119 self._run(['sync']) 120 121 def get_entry(self, key, field): 122 return self._run(["get", field, key]) 123 124 def get_item(self, key): 125 return json.loads(self.get_entry(key, 'item')) 126 127 def get_notes(self, key): 128 return self.get_item(key).get('notes') 129 130 def get_custom_field(self, key, field): 131 rval = self.get_entry(key, 'item') 132 data = json.loads(rval) 133 if 'fields' not in data: 134 return None 135 matching = [x for x in data['fields'] if x['name'] == field] 136 if len(matching) == 0: 137 return None 138 return matching[0]['value'] 139 140 def get_itemid(self, key): 141 return self.get_item(key).get('id') 142 143 def get_attachments(self, key, itemid): 144 return self._run(['get', 'attachment', key, '--itemid={}'.format(itemid), '--raw']) 145 146 147 class LookupModule(LookupBase): 148 149 def run(self, terms, variables=None, **kwargs): 150 self.bw = Bitwarden(path=kwargs.get('path', 'bw')) 151 152 if kwargs.get('sync'): 153 self.bw.sync() 154 if kwargs.get('session'): 155 self.bw.session = kwargs.get('session') 156 157 values = [] 158 for term in terms: 159 rval = self.lookup(term, kwargs) 160 if rval is None: 161 raise AnsibleError("No matching term, field or attachment found!") 162 values.append(rval) 163 164 return values 165 166 def lookup(self, term, kwargs): 167 if 'file' in kwargs: 168 # Try attachments first 169 itemid = self.bw.get_itemid(term) 170 if itemid is None: 171 raise AnsibleError("No such object, wrong name") 172 return self.bw.get_attachments(kwargs['file'], itemid) 173 174 # By default check password 175 field = kwargs.get('field', 'password') 176 177 # Special field which contains notes. 178 if field == 'notes': 179 return self.bw.get_notes(term) 180 181 # Try custom fields second 182 val = self.bw.get_custom_field(term, field) 183 if val: 184 return val 185 186 # If not found check default bw entries 187 return self.bw.get_entry(term, field) 188 189 def main(): 190 if len(sys.argv) < 3: 191 print("Usage: %s <field> <name> [name name ...]" % os.path.basename(__file__)) 192 return -1 193 194 print(LookupModule().run(sys.argv[2:], variables=None, field=sys.argv[1])) 195 196 return 0 197 198 199 if __name__ == "__main__": 200 sys.exit(main())