auto_analyse_raw_data.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. #!/usr/bin/python
  2. """Attempt an automatic analysis of IRremoteESP8266's Raw data output.
  3. Makes suggestions on key values and tried to break down the message
  4. into likely chunks."""
  5. #
  6. # Copyright 2018 David Conran
  7. import argparse
  8. import sys
  9. class RawIRMessage(object):
  10. """Basic analyse functions & structure for raw IR messages."""
  11. # pylint: disable=too-many-instance-attributes
  12. def __init__(self, margin, timings, output=sys.stdout, verbose=True):
  13. self.hdr_mark = None
  14. self.hdr_space = None
  15. self.bit_mark = None
  16. self.zero_space = None
  17. self.one_space = None
  18. self.gaps = []
  19. self.margin = margin
  20. self.marks = []
  21. self.mark_buckets = {}
  22. self.spaces = []
  23. self.space_buckets = {}
  24. self.output = output
  25. self.verbose = verbose
  26. if len(timings) <= 3:
  27. raise ValueError("Too few message timings supplied.")
  28. self.timings = timings
  29. self._generate_timing_candidates()
  30. self._calc_values()
  31. def _generate_timing_candidates(self):
  32. """Determine the likely values from the given data."""
  33. count = 0
  34. for usecs in self.timings:
  35. count = count + 1
  36. if count % 2:
  37. self.marks.append(usecs)
  38. else:
  39. self.spaces.append(usecs)
  40. self.marks, self.mark_buckets = self.reduce_list(self.marks)
  41. self.spaces, self.space_buckets = self.reduce_list(self.spaces)
  42. def reduce_list(self, items):
  43. """Reduce a list of numbers into buckets that are at least margin apart."""
  44. result = []
  45. last = -1
  46. buckets = {}
  47. for item in sorted(items, reverse=True):
  48. if last == -1 or item < last - self.margin:
  49. result.append(item)
  50. last = item
  51. buckets[last] = [item]
  52. else:
  53. buckets[last].append(item)
  54. return result, buckets
  55. def _usec_compare(self, seen, expected):
  56. """Compare two usec values and see if they match within a
  57. subtractive margin."""
  58. return seen <= expected and seen > expected - self.margin
  59. def _usec_compares(self, usecs, expecteds):
  60. """Compare a usec value to a list of values and return True
  61. if they are within a subtractive margin."""
  62. for expected in expecteds:
  63. if self._usec_compare(usecs, expected):
  64. return True
  65. return False
  66. def display_binary(self, binary_str):
  67. """Display common representations of the suppied binary string."""
  68. num = int(binary_str, 2)
  69. bits = len(binary_str)
  70. rev_binary_str = binary_str[::-1]
  71. rev_num = int(rev_binary_str, 2)
  72. self.output.write("\n Bits: %d\n"
  73. " Hex: %s (MSB first)\n"
  74. " %s (LSB first)\n"
  75. " Dec: %s (MSB first)\n"
  76. " %s (LSB first)\n"
  77. " Bin: 0b%s (MSB first)\n"
  78. " 0b%s (LSB first)\n" %
  79. (bits, "0x{0:0{1}X}".format(num, bits / 4),
  80. "0x{0:0{1}X}".format(rev_num, bits / 4), num, rev_num,
  81. binary_str, rev_binary_str))
  82. def add_data_code(self, bin_str, footer=True):
  83. """Add the common "data" sequence of code to send the bulk of a message."""
  84. # pylint: disable=no-self-use
  85. code = []
  86. code.append(" // Data")
  87. code.append(" // e.g. data = 0x%X, nbits = %d" % (int(bin_str, 2),
  88. len(bin_str)))
  89. code.append(" sendData(kBitMark, kOneSpace, kBitMark, kZeroSpace, data, "
  90. "nbits, true);")
  91. if footer:
  92. code.append(" // Footer")
  93. code.append(" mark(kBitMark);")
  94. return code
  95. def _calc_values(self):
  96. """Calculate the values which describe the standard timings
  97. for the protocol."""
  98. if self.verbose:
  99. self.output.write("Potential Mark Candidates:\n"
  100. "%s\n"
  101. "Potential Space Candidates:\n"
  102. "%s\n" % (str(self.marks), str(self.spaces)))
  103. # Largest mark is likely the kHdrMark
  104. self.hdr_mark = self.marks[0]
  105. # The bit mark is likely to be the smallest mark.
  106. self.bit_mark = self.marks[-1]
  107. if self.is_space_encoded() and len(self.spaces) >= 3:
  108. if self.verbose and len(self.marks) > 2:
  109. self.output.write("DANGER: Unexpected and unused mark timings!")
  110. # We should have 3 space candidates at least.
  111. # They should be: zero_space (smallest), one_space, & hdr_space (largest)
  112. spaces = list(self.spaces)
  113. self.zero_space = spaces.pop()
  114. self.one_space = spaces.pop()
  115. self.hdr_space = spaces.pop()
  116. # Rest are probably message gaps
  117. self.gaps = spaces
  118. def is_space_encoded(self):
  119. """Make an educated guess if the message is space encoded."""
  120. return len(self.spaces) > len(self.marks)
  121. def is_hdr_mark(self, usec):
  122. """Is usec the header mark?"""
  123. return self._usec_compare(usec, self.hdr_mark)
  124. def is_hdr_space(self, usec):
  125. """Is usec the header space?"""
  126. return self._usec_compare(usec, self.hdr_space)
  127. def is_bit_mark(self, usec):
  128. """Is usec the bit mark?"""
  129. return self._usec_compare(usec, self.bit_mark)
  130. def is_one_space(self, usec):
  131. """Is usec the one space?"""
  132. return self._usec_compare(usec, self.one_space)
  133. def is_zero_space(self, usec):
  134. """Is usec the zero_space?"""
  135. return self._usec_compare(usec, self.zero_space)
  136. def is_gap(self, usec):
  137. """Is usec the a space gap?"""
  138. return self._usec_compares(usec, self.gaps)
  139. def avg_list(items):
  140. """Return the average of a list of numbers."""
  141. if items:
  142. return sum(items) / len(items)
  143. return 0
  144. def add_bit(so_far, bit, output=sys.stdout):
  145. """Add a bit to the end of the bits collected so far."""
  146. if bit == "reset":
  147. return ""
  148. output.write(str(bit)) # This effectively displays in LSB first order.
  149. return so_far + str(bit) # Storing it in MSB first order.
  150. def convert_rawdata(data_str):
  151. """Parse a C++ rawdata declaration into a list of values."""
  152. start = data_str.find('{')
  153. end = data_str.find('}')
  154. if end == -1:
  155. end = len(data_str)
  156. if start > end:
  157. raise ValueError("Raw Data not parsible due to parentheses placement.")
  158. data_str = data_str[start + 1:end]
  159. results = []
  160. for timing in [x.strip() for x in data_str.split(',')]:
  161. try:
  162. results.append(int(timing))
  163. except ValueError:
  164. raise ValueError(
  165. "Raw Data contains a non-numeric value of '%s'." % timing)
  166. return results
  167. def dump_constants(message, defines, output=sys.stdout):
  168. """Dump the key constants and generate the C++ #defines."""
  169. hdr_mark = avg_list(message.mark_buckets[message.hdr_mark])
  170. bit_mark = avg_list(message.mark_buckets[message.bit_mark])
  171. hdr_space = avg_list(message.space_buckets[message.hdr_space])
  172. one_space = avg_list(message.space_buckets[message.one_space])
  173. zero_space = avg_list(message.space_buckets[message.zero_space])
  174. output.write("Guessing key value:\n"
  175. "kHdrMark = %d\n"
  176. "kHdrSpace = %d\n"
  177. "kBitMark = %d\n"
  178. "kOneSpace = %d\n"
  179. "kZeroSpace = %d\n" % (hdr_mark, hdr_space, bit_mark, one_space,
  180. zero_space))
  181. defines.append("const uint16_t kHdrMark = %d;" % hdr_mark)
  182. defines.append("const uint16_t kBitMark = %d;" % bit_mark)
  183. defines.append("const uint16_t kHdrSpace = %d;" % hdr_space)
  184. defines.append("const uint16_t kOneSpace = %d;" % one_space)
  185. defines.append("const uint16_t kZeroSpace = %d;" % zero_space)
  186. avg_gaps = [avg_list(message.space_buckets[x]) for x in message.gaps]
  187. if len(message.gaps) == 1:
  188. output.write("kSpaceGap = %d\n" % avg_gaps[0])
  189. defines.append("const uint16_t kSpaceGap = %d;" % avg_gaps[0])
  190. else:
  191. count = 0
  192. for gap in avg_gaps:
  193. # We probably (still) have a gap in the protocol.
  194. count = count + 1
  195. output.write("kSpaceGap%d = %d\n" % (count, gap))
  196. defines.append("const uint16_t kSpaceGap%d = %d;" % (count, gap))
  197. def parse_and_report(rawdata_str, margin, gen_code=False, output=sys.stdout):
  198. """Analyse the rawdata c++ definition of a IR message."""
  199. defines = []
  200. function_code = []
  201. # Parse the input.
  202. rawdata = convert_rawdata(rawdata_str)
  203. output.write("Found %d timing entries.\n" % len(rawdata))
  204. message = RawIRMessage(margin, rawdata, output)
  205. output.write("\nGuessing encoding type:\n")
  206. if message.is_space_encoded():
  207. output.write("Looks like it uses space encoding. Yay!\n\n")
  208. dump_constants(message, defines, output)
  209. else:
  210. output.write("Sorry, it looks like it is Mark encoded. "
  211. "I can't do that yet. Exiting.\n")
  212. sys.exit(1)
  213. total_bits = decode_data(message, defines, function_code, output)
  214. if gen_code:
  215. generate_irsend_code(defines, function_code, total_bits, output)
  216. def decode_data(message, defines, function_code, output=sys.stdout):
  217. """Decode the data sequence with the given values in mind."""
  218. # pylint: disable=too-many-branches,too-many-statements
  219. # Now we have likely candidates for the key values, go through the original
  220. # sequence and break it up and indicate accordingly.
  221. output.write("\nDecoding protocol based on analysis so far:\n\n")
  222. state = ""
  223. count = 1
  224. total_bits = ""
  225. binary_value = add_bit("", "reset")
  226. function_code.extend([
  227. "// Function should be safe up to 64 bits.",
  228. "void IRsend::sendXyz(const uint64_t data, const uint16_t"
  229. " nbits, const uint16_t repeat) {",
  230. " enableIROut(38); // A guess. Most common frequency.",
  231. " for (uint16_t r = 0; r <= repeat; r++) {"
  232. ])
  233. for usec in message.timings:
  234. if (message.is_hdr_mark(usec) and count % 2 and
  235. not message.is_bit_mark(usec)):
  236. state = "HM"
  237. if binary_value:
  238. message.display_binary(binary_value)
  239. function_code.extend(message.add_data_code(binary_value, False))
  240. total_bits = total_bits + binary_value
  241. binary_value = add_bit(binary_value, "reset")
  242. output.write("kHdrMark+")
  243. function_code.extend([" // Header", " mark(kHdrMark);"])
  244. elif message.is_hdr_space(usec) and not message.is_one_space(usec):
  245. if state != "HM":
  246. if binary_value:
  247. message.display_binary(binary_value)
  248. total_bits = total_bits + binary_value
  249. function_code.extend(message.add_data_code(binary_value))
  250. binary_value = add_bit(binary_value, "reset")
  251. output.write("UNEXPECTED->")
  252. state = "HS"
  253. output.write("kHdrSpace+")
  254. function_code.append(" space(kHdrSpace);")
  255. elif message.is_bit_mark(usec) and count % 2:
  256. if state != "HS" and state != "BS":
  257. output.write("kBitMark(UNEXPECTED)")
  258. state = "BM"
  259. elif message.is_zero_space(usec):
  260. if state != "BM":
  261. output.write("kZeroSpace(UNEXPECTED)")
  262. state = "BS"
  263. binary_value = add_bit(binary_value, 0, output)
  264. elif message.is_one_space(usec):
  265. if state != "BM":
  266. output.write("kOneSpace(UNEXPECTED)")
  267. state = "BS"
  268. binary_value = add_bit(binary_value, 1, output)
  269. elif message.is_gap(usec):
  270. if state != "BM":
  271. output.write("UNEXPECTED->")
  272. state = "GS"
  273. output.write("GAP(%d)" % usec)
  274. if binary_value:
  275. message.display_binary(binary_value)
  276. function_code.extend(message.add_data_code(binary_value))
  277. else:
  278. function_code.extend([" // Gap", " mark(kBitMark);"])
  279. function_code.append(" space(kSpaceGap);")
  280. total_bits = total_bits + binary_value
  281. binary_value = add_bit(binary_value, "reset")
  282. else:
  283. output.write("UNKNOWN(%d)" % usec)
  284. state = "UNK"
  285. count = count + 1
  286. if binary_value:
  287. message.display_binary(binary_value)
  288. function_code.extend(message.add_data_code(binary_value))
  289. function_code.extend([
  290. " space(100000); // A 100% made up guess of the gap"
  291. " between messages.", " }", "}"
  292. ])
  293. total_bits = total_bits + binary_value
  294. output.write("\nTotal Nr. of suspected bits: %d\n" % len(total_bits))
  295. defines.append("const uint16_t kXyzBits = %d;" % len(total_bits))
  296. if len(total_bits) > 64:
  297. defines.append("const uint16_t kXyzStateLength = %d;" %
  298. (len(total_bits) / 8))
  299. return total_bits
  300. def generate_irsend_code(defines, normal, bits_str, output=sys.stdout):
  301. """Output the estimated C++ code to reproduce the IR message."""
  302. output.write("\nGenerating a VERY rough code outline:\n\n"
  303. "// WARNING: This probably isn't directly usable."
  304. " It's a guide only.\n")
  305. for line in defines:
  306. output.write("%s\n" % line)
  307. if len(bits_str) > 64: # Will it fit in a uint64_t?
  308. output.write("// DANGER: More than 64 bits detected. A uint64_t for "
  309. "'data' won't work!\n")
  310. # Display the "normal" version's code incase there are some
  311. # oddities in it.
  312. for line in normal:
  313. output.write("%s\n" % line)
  314. if len(bits_str) > 64: # Will it fit in a uint64_t?
  315. output.write("\n\n// Alternative >64 bit Function\n"
  316. "void IRsend::sendXyz(uint8_t data[], uint16_t nbytes,"
  317. " uint16_t repeat) {\n"
  318. " // nbytes should typically be kXyzStateLength\n"
  319. " // data should typically be:\n"
  320. " // uint8_t data[kXyzStateLength] = {0x%s};\n"
  321. " // data[] is assumed to be in MSB order for this code.\n"
  322. " for (uint16_t r = 0; r <= repeat; r++) {\n"
  323. " sendGeneric(kHdrMark, kHdrSpace,\n"
  324. " kBitMark, kOneSpace,\n"
  325. " kBitMark, kZeroSpace,\n"
  326. " kBitMark,\n"
  327. " 100000, // 100%% made-up guess at the"
  328. " message gap.\n"
  329. " data, nbytes,\n"
  330. " 38000, // Complete guess of the modulation"
  331. " frequency.\n"
  332. " true, 0, 50);\n"
  333. " }\n"
  334. "}\n" % ", 0x".join("%02X" % int(bits_str[i:i + 8], 2)
  335. for i in range(0, len(bits_str), 8)))
  336. def main():
  337. """Parse the commandline arguments and call the method."""
  338. arg_parser = argparse.ArgumentParser(
  339. description="Read an IRremoteESP8266 rawData declaration and tries to "
  340. "analyse it.",
  341. formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  342. arg_parser.add_argument(
  343. "-g",
  344. "--code",
  345. action="store_true",
  346. default=False,
  347. dest="gen_code",
  348. help="Generate a C++ code outline to aid making an IRsend function.")
  349. arg_group = arg_parser.add_mutually_exclusive_group(required=True)
  350. arg_group.add_argument(
  351. "rawdata",
  352. help="A rawData line from IRrecvDumpV2. e.g. 'uint16_t rawbuf[37] = {"
  353. "7930, 3952, 494, 1482, 520, 1482, 494, 1508, 494, 520, 494, 1482, 494, "
  354. "520, 494, 1482, 494, 1482, 494, 3978, 494, 520, 494, 520, 494, 520, "
  355. "494, 520, 520, 520, 494, 520, 494, 520, 494, 520, 494};'",
  356. nargs="?")
  357. arg_group.add_argument(
  358. "-f", "--file", help="Read in a rawData line from the file.")
  359. arg_parser.add_argument(
  360. "-r",
  361. "--range",
  362. type=int,
  363. help="Max number of micro-seconds difference between values to consider"
  364. " it the same value.",
  365. dest="margin",
  366. default=200)
  367. arg_group.add_argument(
  368. "--stdin",
  369. help="Read in a rawData line from STDIN.",
  370. action="store_true",
  371. default=False)
  372. arg_options = arg_parser.parse_args()
  373. if arg_options.stdin:
  374. data = sys.stdin.read()
  375. elif arg_options.file:
  376. with open(arg_options.file) as input_file:
  377. data = input_file.read()
  378. else:
  379. data = arg_options.rawdata
  380. parse_and_report(data, arg_options.margin, arg_options.gen_code)
  381. if __name__ == '__main__':
  382. main()