| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- import cv2
- import pytesseract
- import time
- from datetime import datetime
- from pytesseract import Output
- import cv2
- import pytesseract
- from pytesseract import Output
- import numpy as np
- import re
- import os
- from influxdb import InfluxDBClient
- client = InfluxDBClient(host="localhost", port=8086, username="influxdb", password="influxdbTSGAMES", timeout=120_000)
- client.create_database("influxdb")
- client.switch_database('influxdb')
- class SolarMonitor:
- BRIGHTNESS_THRESHOLD = 50 # 0-255
- camera = None
- def __init__(self, test = False):
- self.test = test
- if not test:
- self.initCamera()
-
- def parse(self, img):
- custom_oem=r'--oem 3 --psm 7'
- # https://github.com/adrianlazaro8/Tesseract_sevenSegmentsLetsGoDigital
- data = pytesseract.image_to_data(img, lang='lets', config=custom_oem, output_type=Output.DICT)
- #data = pytesseract.image_to_data(img, lang='letsgodigital', config=custom_oem, output_type=Output.DICT)
- #data = pytesseract.image_to_data(img, config=custom_oem, output_type=Output.DICT)
- print(data)
- results = []
- for i in range(len(data['text'])):
- text = data['text'][i].strip('.,-_')
- text = re.sub('[^0-9]', '', text)
- if text:
- results.append(text)
- if len(results) == 2:
- results = list(map(lambda x: int(x) / 10.,results ))
- if results[0] < 80 and results[1] < 900:
- return results
- return None
- def getMasked(self, image):
- pixel_values = image.reshape((-1, 1))
- pixel_values = np.float32(pixel_values)
- _, result = cv2.threshold(image,self.BRIGHTNESS_THRESHOLD,255,cv2.THRESH_TRUNC)
- return result
- # define stopping criteria
- criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 100, 0.2)
- # number of clusters (K)
- k = 2
- _, labels, (centers) = cv2.kmeans(pixel_values, k, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS)
- # convert back to 8 bit values
- centers = np.uint8(centers)
- # flatten the labels array
- labels = labels.flatten()
- # convert all pixels to the color of the centroids
- segmented_image = centers[labels.flatten()]
- segmented_image = segmented_image.reshape(image.shape)
-
- # disable only the cluster number 2 (turn the pixel into black)
- masked_image = np.copy(image)
- # convert to the shape of a vector of pixel values
- masked_image = masked_image.reshape((-1, 1))
- # color (i.e cluster) to disable
- cluster = 2
- masked_image[labels != cluster] = [0]
- # convert back to original shape
- masked_image = masked_image.reshape(image.shape)
-
- return masked_image
-
- def thresholding2(self, image):
- # image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
- # Taking a matrix of size 5 as the kernel
- kernel = np.ones((2, 2), np.uint8)
-
- #kernel = np.ones((2, 2), np.uint8)
- # The first parameter is the original image,
- # kernel is the matrix with which image is
- # convolved and third parameter is the number
- # of iterations, which will determine how much
- # you want to erode/dilate a given image.
- #image = cv2.convertScaleAbs(image, alpha=10.0, beta=-700)
- #image = cv2.erode(image, kernel, iterations=1)
- #image = cv2.convertScaleAbs(image, alpha=2.0, beta=-50)
- #image = (255 - image)
- # image = self.getMasked(image)
- # image = cv2.dilate(image, kernel, iterations=1)
- image = cv2.medianBlur(image,3)
- #thresh = cv2.adaptiveThreshold(image,255,cv2.ADAPTIVE_THRESH_GAUSSIAN_C,cv2.THRESH_BINARY,51,1)
- # detect the contours on the binary image using cv2.CHAIN_APPROX_NONE
- #contours, hierarchy = cv2.findContours(image=thresh, mode=cv2.RETR_TREE, method=cv2.CHAIN_APPROX_NONE)
-
- #cv2.drawContours(image=image, contours=contours, contourIdx=-1, color=(255, 255, 255), thickness=1, lineType=cv2.LINE_AA)
- #im2, contours = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
- # cv2.drawContours(image, contours, -1, (0,255,0), 3)
- #image = np.invert(image)
- return image
-
- def crop(self, image):
- #top=384
- #left=210
- #height= 60
- #width=230
- #return image[top : (top + height) , left: (left + width)]
- pt_TL = [260, 190]
- pt_BL = [260, 242]
- pt_BR = [500, 240]
- pt_TR = [500, 190]
- width_AD = np.sqrt(((pt_TL[0] - pt_TR[0]) ** 2) + ((pt_TL[1] - pt_TR[1]) ** 2))
- width_BC = np.sqrt(((pt_BL[0] - pt_BR[0]) ** 2) + ((pt_BL[1] - pt_BR[1]) ** 2))
- maxWidth = max(int(width_AD), int(width_BC))
- height_AB = np.sqrt(((pt_TL[0] - pt_BL[0]) ** 2) + ((pt_TL[1] - pt_BL[1]) ** 2))
- height_CD = np.sqrt(((pt_BR[0] - pt_TR[0]) ** 2) + ((pt_BR[1] - pt_TR[1]) ** 2))
- maxHeight = max(int(height_AB), int(height_CD))
- input_pts = np.float32([pt_TL, pt_BL, pt_BR, pt_TR])
- output_pts = np.float32([[0, 0],
- [0, maxHeight - 1],
- [maxWidth - 1, maxHeight - 1],
- [maxWidth - 1, 0]])
- M = cv2.getPerspectiveTransform(input_pts,output_pts)
- image = cv2.warpPerspective(image,M,(maxWidth, maxHeight),flags=cv2.INTER_LINEAR)
- return image
- def initCamera(self):
- from picamera import PiCamera
- self.camera = PiCamera(
- resolution=(800, 608)
- )
- self.camera.awb_mode = 'off'
- self.camera.awb_gains = (1.5, 2.0)
- self.camera.shutter_speed = 10000
- self.camera.iso = 800
- self.camera.rotation = 0
- def capture(self):
- from picamera.array import PiRGBArray
- rawCapture = PiRGBArray(self.camera)
- file = "images/" + str(datetime.now()) + ".jpg"
- self.camera.capture(rawCapture, format="bgr")
- img = rawCapture.array
- img = self.crop(img)
- print("Captured.")
- return [file, img]
- def writeData(self, results):
- client.write_points([
- {
- "measurement": "solar",
- "tags": {
- "type": "U"
- },
- "fields": {
- "value": results[0]
- }
- },
- {
- "measurement": "solar",
- "tags": {
- "type": "W"
- },
- "fields": {
- "value": results[1]
- }
- }
- ], time_precision='ms')
-
- def run(self):
- if self.test:
- for file in os.listdir('tests'):
- img = cv2.imread('tests/' + file)
- img = self.thresholding2(img)
- results = self.parse(img)
- print(file)
- print(results)
- if results:
- self.writeData(results)
- cv2.imwrite(file + '_r.jpg', img)
- else:
- [file, img] = solar.capture()
- img = self.thresholding2(img)
- results = self.parse(img)
- print(results)
- if results:
- self.writeData(results)
- img = cv2.putText(img, str(results), (75, 22), cv2.FONT_HERSHEY_SIMPLEX, 0.25, (255, 255, 255, 255, 255), 1, cv2.LINE_AA)
- cv2.imwrite(file + '_r.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 35])
- time.sleep(5)
- solar = SolarMonitor(test = False)
- while True:
- solar.run()
-
|