/ lookup_plugins / bitwarden.py
bitwarden.py
  1  #!/usr/bin/env python
  2  
  3  # (c) 2018, Matt Stofko <matt@mjslabs.com>
  4  # GNU General Public License v3.0+ (see LICENSE or
  5  # https://www.gnu.org/licenses/gpl-3.0.txt)
  6  #
  7  # This plugin can be run directly by specifying the field followed by a list of
  8  # entries, e.g.  bitwarden.py password google.com wufoo.com
  9  #
 10  # This version includes fixes that can be found in this fork:
 11  # https://github.com/status-im/ansible-modules-bitwarden
 12  #
 13  from __future__ import (absolute_import, division, print_function)
 14  __metaclass__ = type
 15  
 16  import json
 17  import os
 18  import sys
 19  
 20  from shutil import which
 21  from subprocess import Popen, PIPE, STDOUT, check_output
 22  
 23  from ansible.errors import AnsibleError
 24  from ansible.plugins.lookup import LookupBase
 25  
 26  try:
 27      from __main__ import display
 28  except ImportError:
 29      from ansible.utils.display import Display
 30      display = Display()
 31  
 32  
 33  DOCUMENTATION = """
 34  lookup: bitwarden
 35  author:
 36    - Matt Stofko <matt@mjslabs.com>
 37  requirements:
 38    - bw (command line utility)
 39    - BW_SESSION environment var (from `bw login` or `bw unlock`)
 40  short_description: look up data from a bitwarden vault
 41  description:
 42    - use the bw command line utility to grab one or more items stored in a
 43      bitwarden vault
 44  options:
 45    _terms:
 46      description: name of item that contains the field to fetch
 47      required: true
 48  field:
 49    description: field to return from bitwarden
 50    default: 'password'
 51  sync:
 52    description: If True, call `bw sync` before lookup
 53  """
 54  
 55  EXAMPLES = """
 56  - name: get 'username' from Bitwarden entry 'Google'
 57    debug:
 58      msg: "{{ lookup('bitwarden', 'Google', field='username') }}"
 59  """
 60  
 61  RETURN = """
 62    _raw:
 63      description:
 64        - Items from Bitwarden vault
 65  """
 66  
 67  
 68  class Bitwarden(object):
 69      def __init__(self, path):
 70          self._cli_path = path
 71          self._bw_session = ""
 72          if which("bw") is None:
 73              raise AnsibleError("Command not found: {0}".format(self._cli_path))
 74  
 75      @property
 76      def session(self):
 77          return self._bw_session
 78  
 79      @session.setter
 80      def session(self, value):
 81          self._bw_session = value
 82  
 83      @property
 84      def cli_path(self):
 85          return self._cli_path
 86  
 87      def _run(self, args):
 88          my_env = os.environ.copy()
 89          if self.session != "":
 90              my_env["BW_SESSION"] = self.session
 91          p = Popen([self.cli_path] + args, stdin=PIPE,
 92                    stdout=PIPE, stderr=STDOUT, env=my_env)
 93          out, _ = p.communicate()
 94          out = out.decode()
 95          rc = p.wait()
 96          if rc != 0:
 97              display.debug("Received error when running '{0} {1}': {2}"
 98                            .format(self.cli_path, args, out))
 99              if out.startswith("Vault is locked."):
100                  raise AnsibleError("Error accessing Bitwarden vault. "
101                                     "Run 'bw unlock' to unlock the vault.")
102              elif out.startswith("You are not logged in."):
103                  raise AnsibleError("Error accessing Bitwarden vault. "
104                                     "Run 'bw login' to login.")
105              elif out.startswith("Failed to decrypt."):
106                  raise AnsibleError("Error accessing Bitwarden vault. "
107                                     "Make sure BW_SESSION is set properly.")
108              elif out.startswith("More than one result was found."):
109                  raise AnsibleError("More than one object found with this name.")
110              elif out.startswith("Not found."):
111                  raise AnsibleError("Error accessing Bitwarden vault. "
112                                     "Specified item not found: {}".format(args[-1]))
113              else:
114                  print("Unknown failure in 'bw' command: \n%s" % out)
115                  return None
116          return out.strip()
117  
118      def sync(self):
119          self._run(['sync'])
120  
121      def get_entry(self, key, field):
122          return self._run(["get", field, key])
123  
124      def get_item(self, key):
125          return json.loads(self.get_entry(key, 'item'))
126  
127      def get_notes(self, key):
128          return self.get_item(key).get('notes')
129  
130      def get_custom_field(self, key, field):
131          rval = self.get_entry(key, 'item')
132          data = json.loads(rval)
133          if 'fields' not in data:
134              return None
135          matching = [x for x in data['fields'] if x['name'] == field]
136          if len(matching) == 0:
137              return None
138          return matching[0]['value']
139  
140      def get_itemid(self, key):
141          return self.get_item(key).get('id')
142  
143      def get_attachments(self, key, itemid):
144          return self._run(['get', 'attachment', key, '--itemid={}'.format(itemid), '--raw'])
145  
146  
147  class LookupModule(LookupBase):
148  
149      def run(self, terms, variables=None, **kwargs):
150          self.bw = Bitwarden(path=kwargs.get('path', 'bw'))
151  
152          if kwargs.get('sync'):
153              self.bw.sync()
154          if kwargs.get('session'):
155              self.bw.session = kwargs.get('session')
156  
157          values = []
158          for term in terms:
159              rval = self.lookup(term, kwargs)
160              if rval is None:
161                  raise AnsibleError("No matching term, field or attachment found!")
162              values.append(rval)
163  
164          return values
165  
166      def lookup(self, term, kwargs):
167          if 'file' in kwargs:
168              # Try attachments first
169              itemid = self.bw.get_itemid(term)
170              if itemid is None:
171                  raise AnsibleError("No such object, wrong name")
172              return self.bw.get_attachments(kwargs['file'], itemid)
173  
174          # By default check password
175          field = kwargs.get('field', 'password')
176  
177          # Special field which contains notes.
178          if field == 'notes':
179              return self.bw.get_notes(term)
180  
181          # Try custom fields second
182          val = self.bw.get_custom_field(term, field)
183          if val:
184              return val
185  
186          # If not found check default bw entries
187          return self.bw.get_entry(term, field)
188  
189  def main():
190      if len(sys.argv) < 3:
191          print("Usage: %s <field> <name> [name name ...]" % os.path.basename(__file__))
192          return -1
193  
194      print(LookupModule().run(sys.argv[2:], variables=None, field=sys.argv[1]))
195  
196      return 0
197  
198  
199  if __name__ == "__main__":
200      sys.exit(main())